// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#![warn(clippy::all)]
//! Test SQL syntax specific to PostgreSQL. The parser based on the
//! generic dialect is also tested (on the inputs it can handle).

#[macro_use]
mod test_utils;

use helpers::attached_token::AttachedToken;
use sqlparser::ast::{
    DataType, DropBehavior, DropOperator, DropOperatorClass, DropOperatorSignature,
};
use sqlparser::tokenizer::Span;
use test_utils::*;

use sqlparser::ast::*;
use sqlparser::dialect::{GenericDialect, PostgreSqlDialect};
use sqlparser::parser::ParserError;

#[test]
fn parse_create_table_generated_always_as_identity() {
    //With primary key
    let sql = "CREATE TABLE table2 (
    column21 bigint primary key generated always as identity    ,
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column21 BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column21 bigint primary key generated by default as identity    ,
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column21 BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, \
        column30 TEXT)",
    );

    //With out primary key
    let sql = "CREATE TABLE table2 (
    column22 bigint generated always as identity    ,
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column22 BIGINT GENERATED ALWAYS AS IDENTITY, \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column22 bigint generated by default as identity    ,
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column22 BIGINT GENERATED BY DEFAULT AS IDENTITY, \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column23 bigint generated by default as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 NO CYCLE ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column23 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 NO CYCLE ), \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column24 bigint generated by default as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 CYCLE ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column24 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 CYCLE ), \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column25 bigint generated by default as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column25 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column26 bigint generated by default as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column26 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column27 bigint generated by default as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column27 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column28 bigint generated by default as identity ( INCREMENT 1 MINVALUE 1 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column28 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 MINVALUE 1 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column29 bigint generated by default as identity ( INCREMENT 1 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column29 BIGINT GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 ), \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column22 bigint generated always as identity    ,
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column22 BIGINT GENERATED ALWAYS AS IDENTITY, \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column23 bigint generated always as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 NO CYCLE ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column23 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 NO CYCLE ), \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column24 bigint generated always as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 CYCLE ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column24 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 CYCLE ), \
        column30 TEXT)",
    );

    let sql = "CREATE TABLE table2 (
    column25 bigint generated always as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column25 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column26 bigint generated always as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column26 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column27 bigint generated always as identity ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column27 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 MINVALUE 1 MAXVALUE 20 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column28 bigint generated always as identity ( INCREMENT 1 MINVALUE 1 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column28 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 MINVALUE 1 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    column29 bigint generated always as identity ( INCREMENT 1 ),
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        column29 BIGINT GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 ), \
        column30 TEXT)",
    );
    let sql = "CREATE TABLE table2 (
    priceInDollar numeric,
	princeInPound numeric GENERATED ALWAYS AS (priceInDollar * 0.22) STORED,
    column30 text );";
    pg().one_statement_parses_to(
        sql,
        "CREATE TABLE table2 (\
        priceInDollar NUMERIC, \
	    princeInPound NUMERIC GENERATED ALWAYS AS (priceInDollar * 0.22) STORED, \
        column30 TEXT)",
    );
}

#[test]
fn parse_create_sequence() {
    // SimpleLogger::new().init().unwrap();

    let sql1 = "CREATE SEQUENCE  name0";
    pg().one_statement_parses_to(sql1, "CREATE SEQUENCE name0");

    let sql2 = "CREATE SEQUENCE  IF NOT EXISTS  name0";
    pg().one_statement_parses_to(sql2, "CREATE SEQUENCE IF NOT EXISTS name0");

    let sql3 = "CREATE TEMPORARY SEQUENCE  IF NOT EXISTS  name0";
    pg().one_statement_parses_to(sql3, "CREATE TEMPORARY SEQUENCE IF NOT EXISTS name0");

    let sql4 = "CREATE TEMPORARY SEQUENCE  name0";
    pg().one_statement_parses_to(sql4, "CREATE TEMPORARY SEQUENCE name0");

    let sql2 = "CREATE TEMPORARY SEQUENCE IF NOT EXISTS  name1
      AS BIGINT
     INCREMENT BY  1
     MINVALUE 1  MAXVALUE 20
     START WITH 10";
    pg().one_statement_parses_to(
        sql2,
        "CREATE TEMPORARY SEQUENCE IF NOT EXISTS name1 AS BIGINT INCREMENT BY 1 MINVALUE 1 MAXVALUE 20 START WITH 10", );

    let sql3 = "CREATE SEQUENCE IF NOT EXISTS  name2
     AS BIGINT
     INCREMENT  1
     MINVALUE 1  MAXVALUE 20
     START WITH 10 CACHE 2 NO CYCLE";
    pg().one_statement_parses_to(
        sql3,
        "CREATE SEQUENCE IF NOT EXISTS name2 AS BIGINT INCREMENT 1 MINVALUE 1 MAXVALUE 20 START WITH 10 CACHE 2 NO CYCLE",
    );

    let sql4 = "CREATE TEMPORARY SEQUENCE  IF NOT EXISTS  name3
         INCREMENT  1
     NO MINVALUE  MAXVALUE 20 CACHE 2 CYCLE";
    pg().one_statement_parses_to(
        sql4,
        "CREATE TEMPORARY SEQUENCE IF NOT EXISTS name3 INCREMENT 1 NO MINVALUE MAXVALUE 20 CACHE 2 CYCLE",
    );

    let sql5 = "CREATE TEMPORARY SEQUENCE  IF NOT EXISTS  name3
         INCREMENT  1
     NO MINVALUE  MAXVALUE 20 OWNED BY public.table01";
    pg().one_statement_parses_to(
        sql5,
        "CREATE TEMPORARY SEQUENCE IF NOT EXISTS name3 INCREMENT 1 NO MINVALUE MAXVALUE 20 OWNED BY public.table01",
    );

    let sql6 = "CREATE TEMPORARY SEQUENCE  IF NOT EXISTS  name3
         INCREMENT  1
     NO MINVALUE  MAXVALUE 20 OWNED BY NONE";
    pg().one_statement_parses_to(
        sql6,
        "CREATE TEMPORARY SEQUENCE IF NOT EXISTS name3 INCREMENT 1 NO MINVALUE MAXVALUE 20 OWNED BY NONE",
    );

    let sql7 = "CREATE SEQUENCE name4
    AS BIGINT
    INCREMENT   -15
    MINVALUE - 2000  MAXVALUE -50
    START WITH -   60";
    pg().one_statement_parses_to(
        sql7,
        "CREATE SEQUENCE name4 AS BIGINT INCREMENT -15 MINVALUE -2000 MAXVALUE -50 START WITH -60",
    );

    let sql8 = "CREATE SEQUENCE name5
    AS BIGINT
    INCREMENT   +10
    MINVALUE + 30  MAXVALUE +5000
    START WITH +   45";
    pg().one_statement_parses_to(
        sql8,
        "CREATE SEQUENCE name5 AS BIGINT INCREMENT +10 MINVALUE +30 MAXVALUE +5000 START WITH +45",
    );

    assert!(matches!(
        pg().parse_sql_statements("CREATE SEQUENCE foo INCREMENT 1 NO MINVALUE NO"),
        Err(ParserError::ParserError(_))
    ));
}

#[test]
fn parse_drop_sequence() {
    // SimpleLogger::new().init().unwrap();
    let sql1 = "DROP SEQUENCE IF EXISTS  name0 CASCADE";
    pg().one_statement_parses_to(sql1, "DROP SEQUENCE IF EXISTS name0 CASCADE");
    let sql2 = "DROP SEQUENCE IF EXISTS  name1 RESTRICT";
    pg().one_statement_parses_to(sql2, "DROP SEQUENCE IF EXISTS name1 RESTRICT");
    let sql3 = "DROP SEQUENCE  name2 CASCADE";
    pg().one_statement_parses_to(sql3, "DROP SEQUENCE name2 CASCADE");
    let sql4 = "DROP SEQUENCE  name2";
    pg().one_statement_parses_to(sql4, "DROP SEQUENCE name2");
    let sql5 = "DROP SEQUENCE  name0 CASCADE";
    pg().one_statement_parses_to(sql5, "DROP SEQUENCE name0 CASCADE");
    let sql6 = "DROP SEQUENCE  name1 RESTRICT";
    pg().one_statement_parses_to(sql6, "DROP SEQUENCE name1 RESTRICT");
    let sql7 = "DROP SEQUENCE  name1, name2, name3";
    pg().one_statement_parses_to(sql7, "DROP SEQUENCE name1, name2, name3");
}

#[test]
fn parse_create_table_with_defaults() {
    let sql = "CREATE TABLE public.customer (
            customer_id integer DEFAULT nextval(public.customer_customer_id_seq),
            store_id smallint NOT NULL,
            first_name character varying(45) NOT NULL,
            last_name character varying(45) COLLATE \"es_ES\" NOT NULL,
            email character varying(50),
            address_id smallint NOT NULL,
            activebool boolean DEFAULT true NOT NULL,
            create_date date DEFAULT now()::text NOT NULL,
            last_update timestamp without time zone DEFAULT now() NOT NULL,
            active int NOT NULL
    ) WITH (fillfactor = 20, user_catalog_table = true, autovacuum_vacuum_threshold = 100)";
    match pg_and_generic().one_statement_parses_to(sql, "") {
        Statement::CreateTable(CreateTable {
            name,
            columns,
            constraints,
            table_options,
            if_not_exists: false,
            external: false,
            file_format: None,
            location: None,
            ..
        }) => {
            use pretty_assertions::assert_eq;
            assert_eq!("public.customer", name.to_string());
            assert_eq!(
                columns,
                vec![
                    ColumnDef {
                        name: "customer_id".into(),
                        data_type: DataType::Integer(None),
                        options: vec![ColumnOptionDef {
                            name: None,
                            option: ColumnOption::Default(
                                pg().verified_expr("nextval(public.customer_customer_id_seq)")
                            )
                        }],
                    },
                    ColumnDef {
                        name: "store_id".into(),
                        data_type: DataType::SmallInt(None),
                        options: vec![ColumnOptionDef {
                            name: None,
                            option: ColumnOption::NotNull,
                        }],
                    },
                    ColumnDef {
                        name: "first_name".into(),
                        data_type: DataType::CharacterVarying(Some(
                            CharacterLength::IntegerLength {
                                length: 45,
                                unit: None
                            }
                        )),
                        options: vec![ColumnOptionDef {
                            name: None,
                            option: ColumnOption::NotNull,
                        }],
                    },
                    ColumnDef {
                        name: "last_name".into(),
                        data_type: DataType::CharacterVarying(Some(
                            CharacterLength::IntegerLength {
                                length: 45,
                                unit: None
                            }
                        )),
                        options: vec![
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::Collation(ObjectName::from(vec![
                                    Ident::with_quote('"', "es_ES")
                                ])),
                            },
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::NotNull,
                            }
                        ],
                    },
                    ColumnDef {
                        name: "email".into(),
                        data_type: DataType::CharacterVarying(Some(
                            CharacterLength::IntegerLength {
                                length: 50,
                                unit: None
                            }
                        )),
                        options: vec![],
                    },
                    ColumnDef {
                        name: "address_id".into(),
                        data_type: DataType::SmallInt(None),
                        options: vec![ColumnOptionDef {
                            name: None,
                            option: ColumnOption::NotNull
                        }],
                    },
                    ColumnDef {
                        name: "activebool".into(),
                        data_type: DataType::Boolean,
                        options: vec![
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::Default(Expr::Value(
                                    (Value::Boolean(true)).with_empty_span()
                                )),
                            },
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::NotNull,
                            }
                        ],
                    },
                    ColumnDef {
                        name: "create_date".into(),
                        data_type: DataType::Date,
                        options: vec![
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::Default(pg().verified_expr("now()::TEXT"))
                            },
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::NotNull,
                            }
                        ],
                    },
                    ColumnDef {
                        name: "last_update".into(),
                        data_type: DataType::Timestamp(None, TimezoneInfo::WithoutTimeZone),
                        options: vec![
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::Default(pg().verified_expr("now()")),
                            },
                            ColumnOptionDef {
                                name: None,
                                option: ColumnOption::NotNull,
                            }
                        ],
                    },
                    ColumnDef {
                        name: "active".into(),
                        data_type: DataType::Int(None),
                        options: vec![ColumnOptionDef {
                            name: None,
                            option: ColumnOption::NotNull
                        }],
                    },
                ]
            );
            assert!(constraints.is_empty());

            let with_options = match table_options {
                CreateTableOptions::With(options) => options,
                _ => unreachable!(),
            };
            assert_eq!(
                with_options,
                vec![
                    SqlOption::KeyValue {
                        key: "fillfactor".into(),
                        value: Expr::value(number("20"))
                    },
                    SqlOption::KeyValue {
                        key: "user_catalog_table".into(),
                        value: Expr::Value((Value::Boolean(true)).with_empty_span())
                    },
                    SqlOption::KeyValue {
                        key: "autovacuum_vacuum_threshold".into(),
                        value: Expr::value(number("100"))
                    },
                ]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_table_from_pg_dump() {
    let sql = "CREATE TABLE public.customer (
            customer_id integer DEFAULT nextval('public.customer_customer_id_seq'::regclass) NOT NULL,
            store_id smallint NOT NULL,
            first_name character varying(45) NOT NULL,
            last_name character varying(45) NOT NULL,
            info text[],
            address_id smallint NOT NULL,
            activebool boolean DEFAULT true NOT NULL,
            create_date date DEFAULT now()::DATE NOT NULL,
            create_date1 date DEFAULT 'now'::TEXT::date NOT NULL,
            last_update timestamp without time zone DEFAULT now(),
            release_year public.year,
            active int
        )";
    pg().one_statement_parses_to(sql, "CREATE TABLE public.customer (\
            customer_id INTEGER DEFAULT nextval('public.customer_customer_id_seq'::REGCLASS) NOT NULL, \
            store_id SMALLINT NOT NULL, \
            first_name CHARACTER VARYING(45) NOT NULL, \
            last_name CHARACTER VARYING(45) NOT NULL, \
            info TEXT[], \
            address_id SMALLINT NOT NULL, \
            activebool BOOLEAN DEFAULT true NOT NULL, \
            create_date DATE DEFAULT now()::DATE NOT NULL, \
            create_date1 DATE DEFAULT 'now'::TEXT::DATE NOT NULL, \
            last_update TIMESTAMP WITHOUT TIME ZONE DEFAULT now(), \
            release_year public.year, \
            active INT\
        )");
}

#[test]
fn parse_create_table_with_inherit() {
    let sql = "\
               CREATE TABLE bazaar.settings (\
               settings_id UUID PRIMARY KEY DEFAULT uuid_generate_v4() NOT NULL, \
               user_id UUID UNIQUE, \
               value TEXT[], \
               use_metric BOOLEAN DEFAULT true\
               )";
    pg().verified_stmt(sql);
}

#[test]
fn parse_create_table_empty() {
    // Zero-column tables are weird, but supported by at least PostgreSQL.
    // <https://github.com/sqlparser-rs/sqlparser-rs/pull/94>
    let _ = pg_and_generic().verified_stmt("CREATE TABLE t ()");
}

#[test]
fn parse_create_table_constraints_only() {
    // Zero-column tables can also have constraints in PostgreSQL
    let sql = "CREATE TABLE t (CONSTRAINT positive CHECK (2 > 1))";
    let ast = pg_and_generic().verified_stmt(sql);
    match ast {
        Statement::CreateTable(CreateTable {
            name,
            columns,
            constraints,
            ..
        }) => {
            assert_eq!("t", name.to_string());
            assert!(columns.is_empty());
            assert_eq!(
                only(constraints).to_string(),
                "CONSTRAINT positive CHECK (2 > 1)"
            );
        }
        _ => unreachable!(),
    };
}

#[test]
fn parse_alter_table_constraints_rename() {
    match alter_table_op(
        pg().verified_stmt("ALTER TABLE tab RENAME CONSTRAINT old_name TO new_name"),
    ) {
        AlterTableOperation::RenameConstraint { old_name, new_name } => {
            assert_eq!(old_name.to_string(), "old_name");
            assert_eq!(new_name.to_string(), "new_name");
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_alter_table_constraints_unique_nulls_distinct() {
    match pg_and_generic()
        .verified_stmt("ALTER TABLE t ADD CONSTRAINT b UNIQUE NULLS NOT DISTINCT (c)")
    {
        Statement::AlterTable(alter_table) => match &alter_table.operations[0] {
            AlterTableOperation::AddConstraint {
                constraint: TableConstraint::Unique(constraint),
                ..
            } => {
                let nulls_distinct = &constraint.nulls_distinct;
                assert_eq!(nulls_distinct, &NullsDistinctOption::NotDistinct)
            }
            _ => unreachable!(),
        },
        _ => unreachable!(),
    }
    pg_and_generic().verified_stmt("ALTER TABLE t ADD CONSTRAINT b UNIQUE NULLS DISTINCT (c)");
    pg_and_generic().verified_stmt("ALTER TABLE t ADD CONSTRAINT b UNIQUE (c)");
}

#[test]
fn parse_alter_table_disable() {
    pg_and_generic().verified_stmt("ALTER TABLE tab DISABLE ROW LEVEL SECURITY");
    pg_and_generic().verified_stmt("ALTER TABLE tab DISABLE RULE rule_name");
}

#[test]
fn parse_alter_table_disable_trigger() {
    pg_and_generic().verified_stmt("ALTER TABLE tab DISABLE TRIGGER ALL");
    pg_and_generic().verified_stmt("ALTER TABLE tab DISABLE TRIGGER USER");
    pg_and_generic().verified_stmt("ALTER TABLE tab DISABLE TRIGGER trigger_name");
}

#[test]
fn parse_alter_table_enable() {
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE ALWAYS RULE rule_name");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE ALWAYS TRIGGER trigger_name");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE REPLICA TRIGGER trigger_name");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE REPLICA RULE rule_name");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE ROW LEVEL SECURITY");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE RULE rule_name");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE TRIGGER ALL");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE TRIGGER USER");
    pg_and_generic().verified_stmt("ALTER TABLE tab ENABLE TRIGGER trigger_name");
}

#[test]
fn parse_truncate_table() {
    pg_and_generic()
        .verified_stmt("TRUNCATE TABLE \"users\", \"orders\" RESTART IDENTITY RESTRICT");
    pg_and_generic().verified_stmt("TRUNCATE users, orders RESTART IDENTITY");
}

#[test]
fn parse_create_extension() {
    pg_and_generic().verified_stmt("CREATE EXTENSION extension_name");
    pg_and_generic().verified_stmt("CREATE EXTENSION extension_name WITH SCHEMA schema_name");
    pg_and_generic().verified_stmt("CREATE EXTENSION extension_name WITH VERSION version");
    pg_and_generic().verified_stmt("CREATE EXTENSION extension_name WITH CASCADE");
    pg_and_generic().verified_stmt(
        "CREATE EXTENSION extension_name WITH SCHEMA schema_name VERSION version CASCADE",
    );
    pg_and_generic()
        .verified_stmt("CREATE EXTENSION extension_name WITH SCHEMA schema_name CASCADE");
    pg_and_generic().verified_stmt("CREATE EXTENSION extension_name WITH VERSION version CASCADE");
    pg_and_generic()
        .verified_stmt("CREATE EXTENSION extension_name WITH SCHEMA schema_name VERSION version");
}

#[test]
fn parse_drop_extension() {
    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION extension_name"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into()],
            if_exists: false,
            cascade_or_restrict: None,
        })
    );
    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION extension_name CASCADE"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into()],
            if_exists: false,
            cascade_or_restrict: Some(ReferentialAction::Cascade),
        })
    );

    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION extension_name RESTRICT"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into()],
            if_exists: false,
            cascade_or_restrict: Some(ReferentialAction::Restrict),
        })
    );

    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION extension_name, extension_name2 CASCADE"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into(), "extension_name2".into()],
            if_exists: false,
            cascade_or_restrict: Some(ReferentialAction::Cascade),
        })
    );

    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION extension_name, extension_name2 RESTRICT"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into(), "extension_name2".into()],
            if_exists: false,
            cascade_or_restrict: Some(ReferentialAction::Restrict),
        })
    );

    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION IF EXISTS extension_name"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into()],
            if_exists: true,
            cascade_or_restrict: None,
        })
    );

    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION IF EXISTS extension_name CASCADE"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into()],
            if_exists: true,
            cascade_or_restrict: Some(ReferentialAction::Cascade),
        })
    );

    assert_eq!(
        pg_and_generic().verified_stmt("DROP EXTENSION IF EXISTS extension_name RESTRICT"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name".into()],
            if_exists: true,
            cascade_or_restrict: Some(ReferentialAction::Restrict),
        })
    );

    assert_eq!(
        pg_and_generic()
            .verified_stmt("DROP EXTENSION IF EXISTS extension_name1, extension_name2 CASCADE"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name1".into(), "extension_name2".into()],
            if_exists: true,
            cascade_or_restrict: Some(ReferentialAction::Cascade),
        })
    );

    assert_eq!(
        pg_and_generic()
            .verified_stmt("DROP EXTENSION IF EXISTS extension_name1, extension_name2 RESTRICT"),
        Statement::DropExtension(DropExtension {
            names: vec!["extension_name1".into(), "extension_name2".into()],
            if_exists: true,
            cascade_or_restrict: Some(ReferentialAction::Restrict),
        })
    );
}

#[test]
fn parse_alter_table_alter_column() {
    pg().verified_stmt("ALTER TABLE tab ALTER COLUMN is_active TYPE TEXT USING 'text'");

    match alter_table_op(
        pg().verified_stmt(
            "ALTER TABLE tab ALTER COLUMN is_active SET DATA TYPE TEXT USING 'text'",
        ),
    ) {
        AlterTableOperation::AlterColumn { column_name, op } => {
            assert_eq!("is_active", column_name.to_string());
            let using_expr =
                Expr::Value(Value::SingleQuotedString("text".to_string()).with_empty_span());
            assert_eq!(
                op,
                AlterColumnOperation::SetDataType {
                    data_type: DataType::Text,
                    using: Some(using_expr),
                    had_set: true,
                }
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_alter_table_alter_column_add_generated() {
    pg_and_generic()
        .verified_stmt("ALTER TABLE t ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY");
    pg_and_generic()
        .verified_stmt("ALTER TABLE t ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY");
    pg_and_generic().verified_stmt("ALTER TABLE t ALTER COLUMN id ADD GENERATED AS IDENTITY");
    pg_and_generic().verified_stmt(
        "ALTER TABLE t ALTER COLUMN id ADD GENERATED AS IDENTITY ( INCREMENT 1 MINVALUE 1 )",
    );
    pg_and_generic().verified_stmt("ALTER TABLE t ALTER COLUMN id ADD GENERATED AS IDENTITY ( )");

    let res = pg().parse_sql_statements(
        "ALTER TABLE t ALTER COLUMN id ADD GENERATED ( INCREMENT 1 MINVALUE 1 )",
    );
    assert_eq!(
        ParserError::ParserError("Expected: AS, found: (".to_string()),
        res.unwrap_err()
    );

    let res = pg().parse_sql_statements(
        "ALTER TABLE t ALTER COLUMN id ADD GENERATED AS IDENTITY ( INCREMENT )",
    );
    assert_eq!(
        ParserError::ParserError("Expected: a value, found: )".to_string()),
        res.unwrap_err()
    );

    let res =
        pg().parse_sql_statements("ALTER TABLE t ALTER COLUMN id ADD GENERATED AS IDENTITY (");
    assert_eq!(
        ParserError::ParserError("Expected: ), found: EOF".to_string()),
        res.unwrap_err()
    );
}

#[test]
fn parse_alter_table_add_columns() {
    match pg().verified_stmt("ALTER TABLE IF EXISTS ONLY tab ADD COLUMN a TEXT, ADD COLUMN b INT") {
        Statement::AlterTable(AlterTable {
            name,
            if_exists,
            only,
            operations,
            ..
        }) => {
            assert_eq!(name.to_string(), "tab");
            assert!(if_exists);
            assert!(only);
            assert_eq!(
                operations,
                vec![
                    AlterTableOperation::AddColumn {
                        column_keyword: true,
                        if_not_exists: false,
                        column_def: ColumnDef {
                            name: "a".into(),
                            data_type: DataType::Text,
                            options: vec![],
                        },
                        column_position: None,
                    },
                    AlterTableOperation::AddColumn {
                        column_keyword: true,
                        if_not_exists: false,
                        column_def: ColumnDef {
                            name: "b".into(),
                            data_type: DataType::Int(None),
                            options: vec![],
                        },
                        column_position: None,
                    },
                ]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_alter_table_owner_to() {
    struct TestCase {
        sql: &'static str,
        expected_owner: Owner,
    }

    let test_cases = vec![
        TestCase {
            sql: "ALTER TABLE tab OWNER TO new_owner",
            expected_owner: Owner::Ident(Ident::new("new_owner".to_string())),
        },
        TestCase {
            sql: "ALTER TABLE tab OWNER TO postgres",
            expected_owner: Owner::Ident(Ident::new("postgres".to_string())),
        },
        TestCase {
            sql: "ALTER TABLE tab OWNER TO CREATE", // treats CREATE as an identifier
            expected_owner: Owner::Ident(Ident::new("CREATE".to_string())),
        },
        TestCase {
            sql: "ALTER TABLE tab OWNER TO \"new_owner\"",
            expected_owner: Owner::Ident(Ident::with_quote('\"', "new_owner".to_string())),
        },
        TestCase {
            sql: "ALTER TABLE tab OWNER TO CURRENT_USER",
            expected_owner: Owner::CurrentUser,
        },
        TestCase {
            sql: "ALTER TABLE tab OWNER TO CURRENT_ROLE",
            expected_owner: Owner::CurrentRole,
        },
        TestCase {
            sql: "ALTER TABLE tab OWNER TO SESSION_USER",
            expected_owner: Owner::SessionUser,
        },
    ];

    for case in test_cases {
        match pg_and_generic().verified_stmt(case.sql) {
            Statement::AlterTable(AlterTable {
                name,
                if_exists: _,
                only: _,
                operations,
                ..
            }) => {
                assert_eq!(name.to_string(), "tab");
                assert_eq!(
                    operations,
                    vec![AlterTableOperation::OwnerTo {
                        new_owner: case.expected_owner.clone()
                    }]
                );
            }
            _ => unreachable!("Expected an AlterTable statement"),
        }
    }

    let res = pg().parse_sql_statements("ALTER TABLE tab OWNER TO CREATE FOO");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: FOO".to_string()),
        res.unwrap_err()
    );

    let res = pg().parse_sql_statements("ALTER TABLE tab OWNER TO 4");
    assert_eq!(
        ParserError::ParserError("Expected: CURRENT_USER, CURRENT_ROLE, SESSION_USER or identifier after OWNER TO. sql parser error: Expected: identifier, found: 4".to_string()),
        res.unwrap_err()
    );
}

#[test]
fn parse_create_table_if_not_exists() {
    let sql = "CREATE TABLE IF NOT EXISTS uk_cities ()";
    let ast = pg_and_generic().verified_stmt(sql);
    match ast {
        Statement::CreateTable(CreateTable {
            name,
            if_not_exists: true,
            ..
        }) => {
            assert_eq!("uk_cities", name.to_string());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_bad_if_not_exists() {
    let res = pg().parse_sql_statements("CREATE TABLE NOT EXISTS uk_cities ()");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: EXISTS".to_string()),
        res.unwrap_err()
    );

    let res = pg().parse_sql_statements("CREATE TABLE IF EXISTS uk_cities ()");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: EXISTS".to_string()),
        res.unwrap_err()
    );

    let res = pg().parse_sql_statements("CREATE TABLE IF uk_cities ()");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: uk_cities".to_string()),
        res.unwrap_err()
    );

    let res = pg().parse_sql_statements("CREATE TABLE IF NOT uk_cities ()");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: NOT".to_string()),
        res.unwrap_err()
    );
}

#[test]
fn parse_create_schema_if_not_exists() {
    let sql = "CREATE SCHEMA IF NOT EXISTS schema_name";
    let ast = pg_and_generic().verified_stmt(sql);
    match ast {
        Statement::CreateSchema {
            if_not_exists: true,
            schema_name,
            ..
        } => assert_eq!("schema_name", schema_name.to_string()),
        _ => unreachable!(),
    }
}

#[test]
fn parse_drop_schema_if_exists() {
    let sql = "DROP SCHEMA IF EXISTS schema_name";
    let ast = pg().verified_stmt(sql);
    match ast {
        Statement::Drop {
            object_type,
            if_exists: true,
            ..
        } => assert_eq!(object_type, ObjectType::Schema),
        _ => unreachable!(),
    }
}

#[test]
fn parse_copy_from_stdin() {
    let sql = r#"COPY public.actor (actor_id, first_name, last_name, last_update, value) FROM stdin;
1	PENELOPE	GUINESS	2006-02-15 09:34:33 0.11111
2	NICK	WAHLBERG	2006-02-15 09:34:33 0.22222
3	ED	CHASE	2006-02-15 09:34:33 0.312323
4	JENNIFER	DAVIS	2006-02-15 09:34:33 0.3232
5	JOHNNY	LOLLOBRIGIDA	2006-02-15 09:34:33 1.343
6	BETTE	NICHOLSON	2006-02-15 09:34:33 5.0
7	GRACE	MOSTEL	2006-02-15 09:34:33 6.0
8	MATTHEW	JOHANSSON	2006-02-15 09:34:33 7.0
9	JOE	SWANK	2006-02-15 09:34:33 8.0
10	CHRISTIAN	GABLE	2006-02-15 09:34:33 9.1
11	ZERO	CAGE	2006-02-15 09:34:33 10.001
12	KARL	BERRY	2017-11-02 19:15:42.308637+08 11.001
A Fateful Reflection of a Waitress And a Boat who must Discover a Sumo Wrestler in Ancient China
Kwara & Kogi
{"Deleted Scenes","Behind the Scenes"}
'awe':5 'awe-inspir':4 'barbarella':1 'cat':13 'conquer':16 'dog':18 'feminist':10 'inspir':6 'monasteri':21 'must':15 'stori':7 'streetcar':2
PHP	₱ USD $
\N  Some other value
\\."#;
    pg_and_generic().one_statement_parses_to(sql, "");
}

#[test]
fn test_copy_from() {
    let stmt = pg().verified_stmt("COPY users FROM 'data.csv'");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: false,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![],
            values: vec![],
        }
    );

    let stmt = pg().verified_stmt("COPY users FROM 'data.csv' DELIMITER ','");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: false,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![CopyLegacyOption::Delimiter(',')],
            values: vec![],
        }
    );

    let stmt = pg().verified_stmt("COPY users FROM 'data.csv' DELIMITER ',' CSV HEADER");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: false,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![
                CopyLegacyOption::Delimiter(','),
                CopyLegacyOption::Csv(vec![CopyLegacyCsvOption::Header,])
            ],
            values: vec![],
        }
    );
}

#[test]
fn test_copy_to() {
    let stmt = pg().verified_stmt("COPY users TO 'data.csv'");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![],
            values: vec![],
        }
    );

    let stmt = pg().verified_stmt("COPY users TO 'data.csv' DELIMITER ','");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![CopyLegacyOption::Delimiter(',')],
            values: vec![],
        }
    );

    let stmt = pg().verified_stmt("COPY users TO 'data.csv' DELIMITER ',' CSV HEADER");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![
                CopyLegacyOption::Delimiter(','),
                CopyLegacyOption::Csv(vec![CopyLegacyCsvOption::Header,])
            ],
            values: vec![],
        }
    )
}

#[test]
fn parse_copy_from() {
    let sql = "COPY table (a, b) FROM 'file.csv' WITH
    (
        FORMAT CSV,
        FREEZE,
        FREEZE TRUE,
        FREEZE FALSE,
        DELIMITER ',',
        NULL '',
        HEADER,
        HEADER TRUE,
        HEADER FALSE,
        QUOTE '\"',
        ESCAPE '\\',
        FORCE_QUOTE (a, b),
        FORCE_NOT_NULL (a),
        FORCE_NULL (b),
        ENCODING 'utf8'
    )";
    assert_eq!(
        pg_and_generic().one_statement_parses_to(sql, ""),
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["table".into()]),
                columns: vec!["a".into(), "b".into()],
            },
            to: false,
            target: CopyTarget::File {
                filename: "file.csv".into()
            },
            options: vec![
                CopyOption::Format("CSV".into()),
                CopyOption::Freeze(true),
                CopyOption::Freeze(true),
                CopyOption::Freeze(false),
                CopyOption::Delimiter(','),
                CopyOption::Null("".into()),
                CopyOption::Header(true),
                CopyOption::Header(true),
                CopyOption::Header(false),
                CopyOption::Quote('"'),
                CopyOption::Escape('\\'),
                CopyOption::ForceQuote(vec!["a".into(), "b".into()]),
                CopyOption::ForceNotNull(vec!["a".into()]),
                CopyOption::ForceNull(vec!["b".into()]),
                CopyOption::Encoding("utf8".into()),
            ],
            legacy_options: vec![],
            values: vec![],
        }
    );
}

#[test]
fn parse_copy_from_error() {
    let res = pg().parse_sql_statements("COPY (SELECT 42 AS a, 'hello' AS b) FROM 'query.csv'");
    assert_eq!(
        ParserError::ParserError("COPY ... FROM does not support query as a source".to_string()),
        res.unwrap_err()
    );
}

#[test]
fn parse_copy_to() {
    let stmt = pg().verified_stmt("COPY users TO 'data.csv'");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![],
            values: vec![],
        }
    );

    let stmt = pg().verified_stmt("COPY country TO STDOUT (DELIMITER '|')");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["country".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::Stdout,
            options: vec![CopyOption::Delimiter('|')],
            legacy_options: vec![],
            values: vec![],
        }
    );

    let stmt =
        pg().verified_stmt("COPY country TO PROGRAM 'gzip > /usr1/proj/bray/sql/country_data.gz'");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["country".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::Program {
                command: "gzip > /usr1/proj/bray/sql/country_data.gz".into(),
            },
            options: vec![],
            legacy_options: vec![],
            values: vec![],
        }
    );

    let stmt = pg().verified_stmt("COPY (SELECT 42 AS a, 'hello' AS b) TO 'query.csv'");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Query(Box::new(Query {
                with: None,
                body: Box::new(SetExpr::Select(Box::new(Select {
                    select_token: AttachedToken::empty(),
                    distinct: None,
                    top: None,
                    top_before_distinct: false,
                    projection: vec![
                        SelectItem::ExprWithAlias {
                            expr: Expr::value(number("42")),
                            alias: Ident {
                                value: "a".into(),
                                quote_style: None,
                                span: Span::empty(),
                            },
                        },
                        SelectItem::ExprWithAlias {
                            expr: Expr::Value(
                                (Value::SingleQuotedString("hello".into())).with_empty_span()
                            ),
                            alias: Ident {
                                value: "b".into(),
                                quote_style: None,
                                span: Span::empty(),
                            },
                        }
                    ],
                    exclude: None,
                    into: None,
                    from: vec![],
                    lateral_views: vec![],
                    prewhere: None,
                    selection: None,
                    group_by: GroupByExpr::Expressions(vec![], vec![]),
                    having: None,
                    named_window: vec![],
                    window_before_qualify: false,
                    cluster_by: vec![],
                    distribute_by: vec![],
                    sort_by: vec![],
                    qualify: None,
                    value_table_mode: None,
                    connect_by: None,
                    flavor: SelectFlavor::Standard,
                }))),
                order_by: None,
                limit_clause: None,
                fetch: None,
                locks: vec![],
                for_clause: None,
                settings: None,
                format_clause: None,
                pipe_operators: vec![],
            })),
            to: true,
            target: CopyTarget::File {
                filename: "query.csv".into(),
            },
            options: vec![],
            legacy_options: vec![],
            values: vec![],
        }
    )
}

#[test]
fn parse_copy_from_before_v9_0() {
    let stmt = pg().verified_stmt("COPY users FROM 'data.csv' BINARY DELIMITER ',' NULL 'null' CSV HEADER QUOTE '\"' ESCAPE '\\' FORCE NOT NULL column");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: false,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![
                CopyLegacyOption::Binary,
                CopyLegacyOption::Delimiter(','),
                CopyLegacyOption::Null("null".into()),
                CopyLegacyOption::Csv(vec![
                    CopyLegacyCsvOption::Header,
                    CopyLegacyCsvOption::Quote('\"'),
                    CopyLegacyCsvOption::Escape('\\'),
                    CopyLegacyCsvOption::ForceNotNull(vec!["column".into()]),
                ]),
            ],
            values: vec![],
        }
    );

    // test 'AS' keyword
    let sql = "COPY users FROM 'data.csv' DELIMITER AS ',' NULL AS 'null' CSV QUOTE AS '\"' ESCAPE AS '\\'";
    assert_eq!(
        pg_and_generic().one_statement_parses_to(sql, ""),
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: false,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![
                CopyLegacyOption::Delimiter(','),
                CopyLegacyOption::Null("null".into()),
                CopyLegacyOption::Csv(vec![
                    CopyLegacyCsvOption::Quote('\"'),
                    CopyLegacyCsvOption::Escape('\\'),
                ]),
            ],
            values: vec![],
        }
    );
}

#[test]
fn parse_copy_to_before_v9_0() {
    let stmt = pg().verified_stmt("COPY users TO 'data.csv' BINARY DELIMITER ',' NULL 'null' CSV HEADER QUOTE '\"' ESCAPE '\\' FORCE QUOTE column");
    assert_eq!(
        stmt,
        Statement::Copy {
            source: CopySource::Table {
                table_name: ObjectName::from(vec!["users".into()]),
                columns: vec![],
            },
            to: true,
            target: CopyTarget::File {
                filename: "data.csv".to_string(),
            },
            options: vec![],
            legacy_options: vec![
                CopyLegacyOption::Binary,
                CopyLegacyOption::Delimiter(','),
                CopyLegacyOption::Null("null".into()),
                CopyLegacyOption::Csv(vec![
                    CopyLegacyCsvOption::Header,
                    CopyLegacyCsvOption::Quote('\"'),
                    CopyLegacyCsvOption::Escape('\\'),
                    CopyLegacyCsvOption::ForceQuote(vec!["column".into()]),
                ]),
            ],
            values: vec![],
        }
    )
}

#[test]
fn parse_set() {
    let stmt = pg_and_generic().verified_stmt("SET a = b");
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: None,
            hivevar: false,
            variable: ObjectName::from(vec![Ident::new("a")]),
            values: vec![Expr::Identifier(Ident {
                value: "b".into(),
                quote_style: None,
                span: Span::empty(),
            })],
        })
    );

    let stmt = pg_and_generic().verified_stmt("SET a = 'b'");
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: None,
            hivevar: false,
            variable: ObjectName::from(vec![Ident::new("a")]),
            values: vec![Expr::Value(
                (Value::SingleQuotedString("b".into())).with_empty_span()
            )],
        })
    );

    let stmt = pg_and_generic().verified_stmt("SET a = 0");
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: None,
            hivevar: false,
            variable: ObjectName::from(vec![Ident::new("a")]),
            values: vec![Expr::value(number("0"))],
        })
    );

    let stmt = pg_and_generic().verified_stmt("SET a = DEFAULT");
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: None,
            hivevar: false,
            variable: ObjectName::from(vec![Ident::new("a")]),
            values: vec![Expr::Identifier(Ident::new("DEFAULT"))],
        })
    );

    let stmt = pg_and_generic().verified_stmt("SET LOCAL a = b");
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: Some(ContextModifier::Local),
            hivevar: false,
            variable: ObjectName::from(vec![Ident::new("a")]),
            values: vec![Expr::Identifier("b".into())],
        })
    );

    let stmt = pg_and_generic().verified_stmt("SET a.b.c = b");
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: None,
            hivevar: false,
            variable: ObjectName::from(vec![Ident::new("a"), Ident::new("b"), Ident::new("c")]),
            values: vec![Expr::Identifier(Ident {
                value: "b".into(),
                quote_style: None,
                span: Span::empty(),
            })],
        })
    );

    let stmt = pg_and_generic().one_statement_parses_to(
        "SET hive.tez.auto.reducer.parallelism=false",
        "SET hive.tez.auto.reducer.parallelism = false",
    );
    assert_eq!(
        stmt,
        Statement::Set(Set::SingleAssignment {
            scope: None,
            hivevar: false,
            variable: ObjectName::from(vec![
                Ident::new("hive"),
                Ident::new("tez"),
                Ident::new("auto"),
                Ident::new("reducer"),
                Ident::new("parallelism")
            ]),
            values: vec![Expr::Value((Value::Boolean(false)).with_empty_span())],
        })
    );

    pg_and_generic().one_statement_parses_to("SET a TO b", "SET a = b");

    assert_eq!(
        pg_and_generic().parse_sql_statements("SET"),
        Err(ParserError::ParserError(
            "Expected: identifier, found: EOF".to_string()
        )),
    );

    assert_eq!(
        pg_and_generic().parse_sql_statements("SET a b"),
        Err(ParserError::ParserError(
            "Expected: equals sign or TO, found: b".to_string()
        )),
    );

    assert_eq!(
        pg_and_generic().parse_sql_statements("SET a ="),
        Err(ParserError::ParserError(
            "Expected: variable value, found: EOF".to_string()
        )),
    );
}

#[test]
fn parse_set_role() {
    let query = "SET SESSION ROLE NONE";
    let stmt = pg_and_generic().verified_stmt(query);
    assert_eq!(
        stmt,
        Statement::Set(Set::SetRole {
            context_modifier: Some(ContextModifier::Session),
            role_name: None,
        })
    );
    assert_eq!(query, stmt.to_string());

    let query = "SET LOCAL ROLE \"rolename\"";
    let stmt = pg_and_generic().verified_stmt(query);
    assert_eq!(
        stmt,
        Statement::Set(Set::SetRole {
            context_modifier: Some(ContextModifier::Local),
            role_name: Some(Ident {
                value: "rolename".to_string(),
                quote_style: Some('\"'),
                span: Span::empty(),
            }),
        })
    );
    assert_eq!(query, stmt.to_string());

    let query = "SET ROLE 'rolename'";
    let stmt = pg_and_generic().verified_stmt(query);
    assert_eq!(
        stmt,
        Statement::Set(Set::SetRole {
            context_modifier: None,
            role_name: Some(Ident {
                value: "rolename".to_string(),
                quote_style: Some('\''),
                span: Span::empty(),
            }),
        })
    );
    assert_eq!(query, stmt.to_string());
}

#[test]
fn parse_show() {
    let stmt = pg_and_generic().verified_stmt("SHOW a a");
    assert_eq!(
        stmt,
        Statement::ShowVariable {
            variable: vec!["a".into(), "a".into()]
        }
    );

    let stmt = pg_and_generic().verified_stmt("SHOW ALL ALL");
    assert_eq!(
        stmt,
        Statement::ShowVariable {
            variable: vec!["ALL".into(), "ALL".into()]
        }
    )
}

#[test]
fn parse_deallocate() {
    let stmt = pg_and_generic().verified_stmt("DEALLOCATE a");
    assert_eq!(
        stmt,
        Statement::Deallocate {
            name: "a".into(),
            prepare: false,
        }
    );

    let stmt = pg_and_generic().verified_stmt("DEALLOCATE ALL");
    assert_eq!(
        stmt,
        Statement::Deallocate {
            name: "ALL".into(),
            prepare: false,
        }
    );

    let stmt = pg_and_generic().verified_stmt("DEALLOCATE PREPARE a");
    assert_eq!(
        stmt,
        Statement::Deallocate {
            name: "a".into(),
            prepare: true,
        }
    );

    let stmt = pg_and_generic().verified_stmt("DEALLOCATE PREPARE ALL");
    assert_eq!(
        stmt,
        Statement::Deallocate {
            name: "ALL".into(),
            prepare: true,
        }
    );
}

#[test]
fn parse_execute() {
    let stmt = pg_and_generic().verified_stmt("EXECUTE a");
    assert_eq!(
        stmt,
        Statement::Execute {
            name: Some(ObjectName::from(vec!["a".into()])),
            parameters: vec![],
            has_parentheses: false,
            using: vec![],
            immediate: false,
            into: vec![],
            output: false,
            default: false,
        }
    );

    let stmt = pg_and_generic().verified_stmt("EXECUTE a(1, 't')");
    assert_eq!(
        stmt,
        Statement::Execute {
            name: Some(ObjectName::from(vec!["a".into()])),
            parameters: vec![
                Expr::value(number("1")),
                Expr::Value((Value::SingleQuotedString("t".to_string())).with_empty_span())
            ],
            has_parentheses: true,
            using: vec![],
            immediate: false,
            into: vec![],
            output: false,
            default: false,
        }
    );

    let stmt = pg_and_generic()
        .verified_stmt("EXECUTE a USING CAST(1337 AS SMALLINT), CAST(7331 AS SMALLINT)");
    assert_eq!(
        stmt,
        Statement::Execute {
            name: Some(ObjectName::from(vec!["a".into()])),
            parameters: vec![],
            has_parentheses: false,
            using: vec![
                ExprWithAlias {
                    expr: Expr::Cast {
                        kind: CastKind::Cast,
                        expr: Box::new(Expr::Value(
                            (Value::Number("1337".parse().unwrap(), false)).with_empty_span()
                        )),
                        data_type: DataType::SmallInt(None),
                        format: None
                    },
                    alias: None
                },
                ExprWithAlias {
                    expr: Expr::Cast {
                        kind: CastKind::Cast,
                        expr: Box::new(Expr::Value(
                            (Value::Number("7331".parse().unwrap(), false)).with_empty_span()
                        )),
                        data_type: DataType::SmallInt(None),
                        format: None
                    },
                    alias: None
                },
            ],
            immediate: false,
            into: vec![],
            output: false,
            default: false,
        }
    );
}

#[test]
fn parse_prepare() {
    let stmt =
        pg_and_generic().verified_stmt("PREPARE a AS INSERT INTO customers VALUES (a1, a2, a3)");
    let sub_stmt = match stmt {
        Statement::Prepare {
            name,
            data_types,
            statement,
            ..
        } => {
            assert_eq!(name, "a".into());
            assert!(data_types.is_empty());

            statement
        }
        _ => unreachable!(),
    };
    match sub_stmt.as_ref() {
        Statement::Insert(Insert {
            table: table_name,
            columns,
            source: Some(source),
            ..
        }) => {
            assert_eq!(table_name.to_string(), "customers");
            assert!(columns.is_empty());

            let expected_values = [vec![
                Expr::Identifier("a1".into()),
                Expr::Identifier("a2".into()),
                Expr::Identifier("a3".into()),
            ]];
            match &*source.body {
                SetExpr::Values(Values { rows, .. }) => {
                    assert_eq!(rows.as_slice(), &expected_values)
                }
                _ => unreachable!(),
            }
        }
        _ => unreachable!(),
    };

    let stmt = pg_and_generic()
        .verified_stmt("PREPARE a (INT, TEXT) AS SELECT * FROM customers WHERE customers.id = a1");
    let sub_stmt = match stmt {
        Statement::Prepare {
            name,
            data_types,
            statement,
            ..
        } => {
            assert_eq!(name, "a".into());
            assert_eq!(data_types, vec![DataType::Int(None), DataType::Text]);

            statement
        }
        _ => unreachable!(),
    };
    assert_eq!(
        sub_stmt,
        Box::new(Statement::Query(Box::new(pg_and_generic().verified_query(
            "SELECT * FROM customers WHERE customers.id = a1"
        ))))
    );
}

#[test]
fn parse_pg_on_conflict() {
    let stmt = pg_and_generic().verified_stmt(
        "INSERT INTO distributors (did, dname) \
        VALUES (5, 'Gizmo Transglobal'), (6, 'Associated Computing, Inc') \
        ON CONFLICT(did) \
        DO UPDATE SET dname = EXCLUDED.dname",
    );
    match stmt {
        Statement::Insert(Insert {
            on:
                Some(OnInsert::OnConflict(OnConflict {
                    conflict_target: Some(ConflictTarget::Columns(cols)),
                    action,
                })),
            ..
        }) => {
            assert_eq!(vec![Ident::from("did")], cols);
            assert_eq!(
                OnConflictAction::DoUpdate(DoUpdate {
                    assignments: vec![Assignment {
                        target: AssignmentTarget::ColumnName(ObjectName::from(
                            vec!["dname".into()]
                        )),
                        value: Expr::CompoundIdentifier(vec!["EXCLUDED".into(), "dname".into()])
                    },],
                    selection: None
                }),
                action
            );
        }
        _ => unreachable!(),
    };

    let stmt = pg_and_generic().verified_stmt(
        "INSERT INTO distributors (did, dname, area) \
        VALUES (5, 'Gizmo Transglobal', 'Mars'), (6, 'Associated Computing, Inc', 'Venus') \
        ON CONFLICT(did, area) \
        DO UPDATE SET dname = EXCLUDED.dname, area = EXCLUDED.area",
    );
    match stmt {
        Statement::Insert(Insert {
            on:
                Some(OnInsert::OnConflict(OnConflict {
                    conflict_target: Some(ConflictTarget::Columns(cols)),
                    action,
                })),
            ..
        }) => {
            assert_eq!(vec![Ident::from("did"), Ident::from("area"),], cols);
            assert_eq!(
                OnConflictAction::DoUpdate(DoUpdate {
                    assignments: vec![
                        Assignment {
                            target: AssignmentTarget::ColumnName(ObjectName::from(vec![
                                "dname".into()
                            ])),
                            value: Expr::CompoundIdentifier(vec![
                                "EXCLUDED".into(),
                                "dname".into()
                            ])
                        },
                        Assignment {
                            target: AssignmentTarget::ColumnName(ObjectName::from(vec![
                                "area".into()
                            ])),
                            value: Expr::CompoundIdentifier(vec!["EXCLUDED".into(), "area".into()])
                        },
                    ],
                    selection: None
                }),
                action
            );
        }
        _ => unreachable!(),
    };

    let stmt = pg_and_generic().verified_stmt(
        "INSERT INTO distributors (did, dname) \
    VALUES (5, 'Gizmo Transglobal'), (6, 'Associated Computing, Inc') \
    ON CONFLICT DO NOTHING",
    );
    match stmt {
        Statement::Insert(Insert {
            on:
                Some(OnInsert::OnConflict(OnConflict {
                    conflict_target: None,
                    action,
                })),
            ..
        }) => {
            assert_eq!(OnConflictAction::DoNothing, action);
        }
        _ => unreachable!(),
    };

    let stmt = pg_and_generic().verified_stmt(
        "INSERT INTO distributors (did, dname, dsize) \
        VALUES (5, 'Gizmo Transglobal', 1000), (6, 'Associated Computing, Inc', 1010) \
        ON CONFLICT(did) \
        DO UPDATE SET dname = $1 WHERE dsize > $2",
    );
    match stmt {
        Statement::Insert(Insert {
            on:
                Some(OnInsert::OnConflict(OnConflict {
                    conflict_target: Some(ConflictTarget::Columns(cols)),
                    action,
                })),
            ..
        }) => {
            assert_eq!(vec![Ident::from("did")], cols);
            assert_eq!(
                OnConflictAction::DoUpdate(DoUpdate {
                    assignments: vec![Assignment {
                        target: AssignmentTarget::ColumnName(ObjectName::from(
                            vec!["dname".into()]
                        )),
                        value: Expr::Value(
                            (Value::Placeholder("$1".to_string())).with_empty_span()
                        )
                    },],
                    selection: Some(Expr::BinaryOp {
                        left: Box::new(Expr::Identifier(Ident {
                            value: "dsize".to_string(),
                            quote_style: None,
                            span: Span::empty(),
                        })),
                        op: BinaryOperator::Gt,
                        right: Box::new(Expr::Value(
                            (Value::Placeholder("$2".to_string())).with_empty_span()
                        ))
                    })
                }),
                action
            );
        }
        _ => unreachable!(),
    };

    let stmt = pg_and_generic().verified_stmt(
        "INSERT INTO distributors (did, dname, dsize) \
        VALUES (5, 'Gizmo Transglobal', 1000), (6, 'Associated Computing, Inc', 1010) \
        ON CONFLICT ON CONSTRAINT distributors_did_pkey \
        DO UPDATE SET dname = $1 WHERE dsize > $2",
    );
    match stmt {
        Statement::Insert(Insert {
            on:
                Some(OnInsert::OnConflict(OnConflict {
                    conflict_target: Some(ConflictTarget::OnConstraint(cname)),
                    action,
                })),
            ..
        }) => {
            assert_eq!(
                ObjectName::from(vec![Ident::from("distributors_did_pkey")]),
                cname
            );
            assert_eq!(
                OnConflictAction::DoUpdate(DoUpdate {
                    assignments: vec![Assignment {
                        target: AssignmentTarget::ColumnName(ObjectName::from(
                            vec!["dname".into()]
                        )),
                        value: Expr::Value(
                            (Value::Placeholder("$1".to_string())).with_empty_span()
                        )
                    },],
                    selection: Some(Expr::BinaryOp {
                        left: Box::new(Expr::Identifier(Ident {
                            value: "dsize".to_string(),
                            quote_style: None,
                            span: Span::empty(),
                        })),
                        op: BinaryOperator::Gt,
                        right: Box::new(Expr::Value(
                            (Value::Placeholder("$2".to_string())).with_empty_span()
                        ))
                    })
                }),
                action
            );
        }
        _ => unreachable!(),
    };
}

#[test]
fn parse_pg_returning() {
    let stmt = pg_and_generic().verified_stmt(
        "INSERT INTO distributors (did, dname) VALUES (DEFAULT, 'XYZ Widgets') RETURNING did",
    );
    match stmt {
        Statement::Insert(Insert { returning, .. }) => {
            assert_eq!(
                Some(vec![SelectItem::UnnamedExpr(Expr::Identifier(
                    "did".into()
                )),]),
                returning
            );
        }
        _ => unreachable!(),
    };

    let stmt = pg_and_generic().verified_stmt(
        "UPDATE weather SET temp_lo = temp_lo + 1, temp_hi = temp_lo + 15, prcp = DEFAULT \
             WHERE city = 'San Francisco' AND date = '2003-07-03' \
             RETURNING temp_lo AS lo, temp_hi AS hi, prcp",
    );
    match stmt {
        Statement::Update(Update { returning, .. }) => {
            assert_eq!(
                Some(vec![
                    SelectItem::ExprWithAlias {
                        expr: Expr::Identifier("temp_lo".into()),
                        alias: "lo".into()
                    },
                    SelectItem::ExprWithAlias {
                        expr: Expr::Identifier("temp_hi".into()),
                        alias: "hi".into()
                    },
                    SelectItem::UnnamedExpr(Expr::Identifier("prcp".into())),
                ]),
                returning
            );
        }
        _ => unreachable!(),
    };
    let stmt =
        pg_and_generic().verified_stmt("DELETE FROM tasks WHERE status = 'DONE' RETURNING *");
    match stmt {
        Statement::Delete(Delete { returning, .. }) => {
            assert_eq!(
                Some(vec![SelectItem::Wildcard(
                    WildcardAdditionalOptions::default()
                ),]),
                returning
            );
        }
        _ => unreachable!(),
    };
}

fn test_operator(operator: &str, dialect: &TestedDialects, expected: BinaryOperator) {
    let operator_tokens =
        sqlparser::tokenizer::Tokenizer::new(&PostgreSqlDialect {}, &format!("a{operator}b"))
            .tokenize()
            .unwrap();
    assert_eq!(
        operator_tokens.len(),
        3,
        "binary op should be 3 tokens, not {operator_tokens:?}"
    );
    let expected_expr = Expr::BinaryOp {
        left: Box::new(Expr::Identifier(Ident::new("a"))),
        op: expected,
        right: Box::new(Expr::Identifier(Ident::new("b"))),
    };
    let str_expr_canonical = format!("a {operator} b");
    assert_eq!(expected_expr, dialect.verified_expr(&str_expr_canonical));
    assert_eq!(
        expected_expr,
        dialect.expr_parses_to(&format!("a{operator}b"), &str_expr_canonical)
    );
}

#[test]
fn parse_pg_binary_ops() {
    let binary_ops = &[
        // Sharp char and Caret cannot be used with Generic Dialect, it conflicts with identifiers
        ("#", BinaryOperator::PGBitwiseXor, pg()),
        ("^", BinaryOperator::PGExp, pg()),
        (">>", BinaryOperator::PGBitwiseShiftRight, pg_and_generic()),
        ("<<", BinaryOperator::PGBitwiseShiftLeft, pg_and_generic()),
        ("&&", BinaryOperator::PGOverlap, pg()),
        ("^@", BinaryOperator::PGStartsWith, pg()),
    ];

    for (str_op, op, dialects) in binary_ops {
        test_operator(str_op, dialects, op.clone());
    }
}

#[test]
fn parse_pg_custom_binary_ops() {
    // Postgres supports declaring custom binary operators, using any character in the following set:
    //  + - * / < > = ~ ! @ # % ^ & | ` ?

    // Here, we test the ones used by common extensions
    let operators = [
        // PostGIS
        "&&&",   // n-D bounding boxes intersect
        "|=|",   //  distance between A and B trajectories at their closest point of approach
        "<<#>>", // n-D distance between A and B bounding boxes
        // PGroonga
        "&@",   // Full text search by a keyword
        "&@~",  // Full text search by easy to use query language
        "&@*",  // Similar search
        "&`",   // Advanced search by ECMAScript like query language
        "&@|",  // Full text search by an array of keywords
        "&@~|", //  Full text search by an array of queries in easy to use query language
        // pgtrgm
        "<<%", // second argument has a continuous extent of an ordered trigram set that matches word boundaries
        "%>>", // commutator of <<%
        "<<<->", // distance between arguments
        // hstore
        "#=", // Replace fields with matching values from hstore
        // ranges
        "-|-", // Is adjacent to
        // pg_similarity
        "~++", // L1 distance
        "~##", // Cosine Distance
        "~-~", // Dice Coefficient
        "~!!", // Euclidean Distance
        "~@~", // Hamming Distance
        "~??", // Jaccard Coefficient
        "~%%", // Jaro Distance
        "~@@", // Jaro-Winkler Distance
        "~==", // Levenshtein Distance
        "~^^", // Matching Coefficient
        "~||", // Monge-Elkan Coefficient
        "~#~", // Needleman-Wunsch Coefficient
        "~**", // Overlap Coefficient
        "~~~", // Q-Gram Distance
        "~=~", // Smith-Waterman Coefficient
        "~!~", // Smith-Waterman-Gotoh Coefficient
        "~*~", // Soundex Distance
        // soundex_operator
        ">@@<", // Soundex matches
        "<@@>", // Soundex doesn't match
    ];
    for op in &operators {
        test_operator(op, &pg(), BinaryOperator::Custom(op.to_string()));
    }
}

#[test]
fn parse_ampersand_arobase() {
    // In SQL Server, a&@b means (a) & (@b), in PostgreSQL it means (a) &@ (b)
    pg().expr_parses_to("a&@b", "a &@ b");
}

#[test]
fn parse_pg_unary_ops() {
    let pg_unary_ops = &[
        ("|/", UnaryOperator::PGSquareRoot),
        ("||/", UnaryOperator::PGCubeRoot),
        ("!!", UnaryOperator::PGPrefixFactorial),
        ("@", UnaryOperator::PGAbs),
    ];
    for (str_op, op) in pg_unary_ops {
        let select = pg().verified_only_select(&format!("SELECT {}a", &str_op));
        assert_eq!(
            SelectItem::UnnamedExpr(Expr::UnaryOp {
                op: *op,
                expr: Box::new(Expr::Identifier(Ident::new("a"))),
            }),
            select.projection[0]
        );
    }
}

#[test]
fn parse_pg_postfix_factorial() {
    let postfix_factorial = &[("!", UnaryOperator::PGPostfixFactorial)];

    for (str_op, op) in postfix_factorial {
        let select = pg().verified_only_select(&format!("SELECT a{}", &str_op));
        assert_eq!(
            SelectItem::UnnamedExpr(Expr::UnaryOp {
                op: *op,
                expr: Box::new(Expr::Identifier(Ident::new("a"))),
            }),
            select.projection[0]
        );
    }
}

#[test]
fn parse_pg_regex_match_ops() {
    let pg_regex_match_ops = &[
        ("~", BinaryOperator::PGRegexMatch),
        ("~*", BinaryOperator::PGRegexIMatch),
        ("!~", BinaryOperator::PGRegexNotMatch),
        ("!~*", BinaryOperator::PGRegexNotIMatch),
    ];

    // Match against a single value
    for (str_op, op) in pg_regex_match_ops {
        let select = pg().verified_only_select(&format!("SELECT 'abc' {str_op} '^a'"));
        assert_eq!(
            SelectItem::UnnamedExpr(Expr::BinaryOp {
                left: Box::new(Expr::Value(single_quoted_string("abc").with_empty_span(),)),
                op: op.clone(),
                right: Box::new(Expr::Value(single_quoted_string("^a").with_empty_span(),)),
            }),
            select.projection[0]
        );
    }

    // Match against any value from an array
    for (str_op, op) in pg_regex_match_ops {
        let select =
            pg().verified_only_select(&format!("SELECT 'abc' {str_op} ANY(ARRAY['^a', 'x'])"));
        assert_eq!(
            SelectItem::UnnamedExpr(Expr::AnyOp {
                left: Box::new(Expr::Value(single_quoted_string("abc").with_empty_span(),)),
                compare_op: op.clone(),
                right: Box::new(Expr::Array(Array {
                    elem: vec![
                        Expr::Value(single_quoted_string("^a").with_empty_span()),
                        Expr::Value(single_quoted_string("x").with_empty_span()),
                    ],
                    named: true,
                })),
                is_some: false,
            }),
            select.projection[0]
        )
    }
}

#[test]
fn parse_pg_like_match_ops() {
    let pg_like_match_ops = &[
        ("~~", BinaryOperator::PGLikeMatch),
        ("~~*", BinaryOperator::PGILikeMatch),
        ("!~~", BinaryOperator::PGNotLikeMatch),
        ("!~~*", BinaryOperator::PGNotILikeMatch),
    ];

    // Match against a single value
    for (str_op, op) in pg_like_match_ops {
        let select = pg().verified_only_select(&format!("SELECT 'abc' {str_op} 'a_c%'"));
        assert_eq!(
            SelectItem::UnnamedExpr(Expr::BinaryOp {
                left: Box::new(Expr::Value(single_quoted_string("abc").with_empty_span(),)),
                op: op.clone(),
                right: Box::new(Expr::Value(single_quoted_string("a_c%").with_empty_span(),)),
            }),
            select.projection[0]
        );
    }

    // Match against all values from an array
    for (str_op, op) in pg_like_match_ops {
        let select =
            pg().verified_only_select(&format!("SELECT 'abc' {str_op} ALL(ARRAY['a_c%'])"));
        assert_eq!(
            SelectItem::UnnamedExpr(Expr::AllOp {
                left: Box::new(Expr::Value(single_quoted_string("abc").with_empty_span(),)),
                compare_op: op.clone(),
                right: Box::new(Expr::Array(Array {
                    elem: vec![Expr::Value(single_quoted_string("a_c%").with_empty_span())],
                    named: true,
                })),
            }),
            select.projection[0]
        )
    }
}

#[test]
fn parse_array_index_expr() {
    let num: Vec<Expr> = (0..=10)
        .map(|s| Expr::Value(number(&s.to_string()).with_empty_span()))
        .collect();

    let sql = "SELECT foo[0] FROM foos";
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(
        &Expr::CompoundFieldAccess {
            root: Box::new(Expr::Identifier(Ident::new("foo"))),
            access_chain: vec![AccessExpr::Subscript(Subscript::Index {
                index: num[0].clone()
            })],
        },
        expr_from_projection(only(&select.projection)),
    );

    let sql = "SELECT foo[0][0] FROM foos";
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(
        &Expr::CompoundFieldAccess {
            root: Box::new(Expr::Identifier(Ident::new("foo"))),
            access_chain: vec![
                AccessExpr::Subscript(Subscript::Index {
                    index: num[0].clone()
                }),
                AccessExpr::Subscript(Subscript::Index {
                    index: num[0].clone()
                })
            ],
        },
        expr_from_projection(only(&select.projection)),
    );

    let sql = r#"SELECT bar[0]["baz"]["fooz"] FROM foos"#;
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(
        &Expr::CompoundFieldAccess {
            root: Box::new(Expr::Identifier(Ident::new("bar"))),
            access_chain: vec![
                AccessExpr::Subscript(Subscript::Index {
                    index: num[0].clone()
                }),
                AccessExpr::Subscript(Subscript::Index {
                    index: Expr::Identifier(Ident {
                        value: "baz".to_string(),
                        quote_style: Some('"'),
                        span: Span::empty(),
                    })
                }),
                AccessExpr::Subscript(Subscript::Index {
                    index: Expr::Identifier(Ident {
                        value: "fooz".to_string(),
                        quote_style: Some('"'),
                        span: Span::empty(),
                    })
                }),
            ],
        },
        expr_from_projection(only(&select.projection)),
    );

    let sql = "SELECT (CAST(ARRAY[ARRAY[2, 3]] AS INT[][]))[1][2]";
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(
        &Expr::CompoundFieldAccess {
            root: Box::new(Expr::Nested(Box::new(Expr::Cast {
                kind: CastKind::Cast,
                expr: Box::new(Expr::Array(Array {
                    elem: vec![Expr::Array(Array {
                        elem: vec![num[2].clone(), num[3].clone(),],
                        named: true,
                    })],
                    named: true,
                })),
                data_type: DataType::Array(ArrayElemTypeDef::SquareBracket(
                    Box::new(DataType::Array(ArrayElemTypeDef::SquareBracket(
                        Box::new(DataType::Int(None)),
                        None
                    ))),
                    None
                )),
                format: None,
            }))),
            access_chain: vec![
                AccessExpr::Subscript(Subscript::Index {
                    index: num[1].clone()
                }),
                AccessExpr::Subscript(Subscript::Index {
                    index: num[2].clone()
                }),
            ],
        },
        expr_from_projection(only(&select.projection)),
    );

    let sql = "SELECT ARRAY[]";
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(
        &Expr::Array(sqlparser::ast::Array {
            elem: vec![],
            named: true
        }),
        expr_from_projection(only(&select.projection)),
    );
}

#[test]
fn parse_array_subscript() {
    let tests = [
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[2]",
            Subscript::Index {
                index: Expr::value(number("2")),
            },
        ),
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[foo]",
            Subscript::Index {
                index: Expr::Identifier(Ident::new("foo")),
            },
        ),
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[2:5]",
            Subscript::Slice {
                lower_bound: Some(Expr::value(number("2"))),
                upper_bound: Some(Expr::value(number("5"))),
                stride: None,
            },
        ),
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[2:5:3]",
            Subscript::Slice {
                lower_bound: Some(Expr::value(number("2"))),
                upper_bound: Some(Expr::value(number("5"))),
                stride: Some(Expr::value(number("3"))),
            },
        ),
        (
            "arr[array_length(arr) - 3:array_length(arr) - 1]",
            Subscript::Slice {
                lower_bound: Some(Expr::BinaryOp {
                    left: Box::new(call("array_length", [Expr::Identifier(Ident::new("arr"))])),
                    op: BinaryOperator::Minus,
                    right: Box::new(Expr::value(number("3"))),
                }),
                upper_bound: Some(Expr::BinaryOp {
                    left: Box::new(call("array_length", [Expr::Identifier(Ident::new("arr"))])),
                    op: BinaryOperator::Minus,
                    right: Box::new(Expr::value(number("1"))),
                }),
                stride: None,
            },
        ),
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[:5]",
            Subscript::Slice {
                lower_bound: None,
                upper_bound: Some(Expr::value(number("5"))),
                stride: None,
            },
        ),
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[2:]",
            Subscript::Slice {
                lower_bound: Some(Expr::value(number("2"))),
                upper_bound: None,
                stride: None,
            },
        ),
        (
            "(ARRAY[1, 2, 3, 4, 5, 6])[:]",
            Subscript::Slice {
                lower_bound: None,
                upper_bound: None,
                stride: None,
            },
        ),
    ];
    for (sql, expect) in tests {
        let Expr::CompoundFieldAccess { access_chain, .. } = pg_and_generic().verified_expr(sql)
        else {
            panic!("expected subscript expr");
        };
        let Some(AccessExpr::Subscript(subscript)) = access_chain.last() else {
            panic!("expected subscript");
        };
        assert_eq!(expect, *subscript);
    }

    pg_and_generic().verified_expr("schedule[:2][2:]");
}

#[test]
fn parse_array_multi_subscript() {
    let expr = pg_and_generic().verified_expr("make_array(1, 2, 3)[1:2][2]");
    assert_eq!(
        Expr::CompoundFieldAccess {
            root: Box::new(call(
                "make_array",
                vec![
                    Expr::value(number("1")),
                    Expr::value(number("2")),
                    Expr::value(number("3"))
                ]
            )),
            access_chain: vec![
                AccessExpr::Subscript(Subscript::Slice {
                    lower_bound: Some(Expr::value(number("1"))),
                    upper_bound: Some(Expr::value(number("2"))),
                    stride: None,
                }),
                AccessExpr::Subscript(Subscript::Index {
                    index: Expr::value(number("2")),
                }),
            ],
        },
        expr,
    );
}

#[test]
fn parse_create_index() {
    let sql = "CREATE INDEX IF NOT EXISTS my_index ON my_table(col1, col2)";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            nulls_distinct: None,
            include,
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["my_index"], &name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(!concurrently);
            assert!(if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert!(include.is_empty());
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_anonymous_index() {
    let sql = "CREATE INDEX ON my_table(col1, col2)";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name,
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            include,
            nulls_distinct: None,
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq!(None, name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(!concurrently);
            assert!(!if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert!(include.is_empty());
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
/// Test to verify the correctness of parsing the `CREATE INDEX` statement with optional operator classes.
///
/// # Implementative details
///
/// At this time, since the parser library is not intended to take care of the semantics of the SQL statements,
/// there is no way to verify the correctness of the operator classes, nor whether they are valid for the given
/// index type. This test is only intended to verify that the parser can correctly parse the statement. For this
/// reason, the test includes a `totally_not_valid` operator class.
fn parse_create_indices_with_operator_classes() {
    let indices = [
        IndexType::GIN,
        IndexType::GiST,
        IndexType::SPGiST,
        IndexType::Custom("CustomIndexType".into()),
    ];
    let operator_classes: [Option<Ident>; 4] = [
        None,
        Some("gin_trgm_ops".into()),
        Some("gist_trgm_ops".into()),
        Some("totally_not_valid".into()),
    ];

    for expected_index_type in indices {
        for expected_operator_class in &operator_classes {
            let single_column_sql_statement = format!(
                "CREATE INDEX the_index_name ON users USING {expected_index_type} (concat_users_name(first_name, last_name){})",
                expected_operator_class.as_ref().map(|oc| format!(" {oc}"))
                    .unwrap_or_default()
            );
            let multi_column_sql_statement = format!(
                "CREATE INDEX the_index_name ON users USING {expected_index_type} (column_name, concat_users_name(first_name, last_name){})",
                expected_operator_class.as_ref().map(|oc| format!(" {oc}"))
                    .unwrap_or_default()
            );

            let expected_function_column = IndexColumn {
                column: OrderByExpr {
                    expr: Expr::Function(Function {
                        name: ObjectName(vec![ObjectNamePart::Identifier(Ident {
                            value: "concat_users_name".to_owned(),
                            quote_style: None,
                            span: Span::empty(),
                        })]),
                        uses_odbc_syntax: false,
                        parameters: FunctionArguments::None,
                        args: FunctionArguments::List(FunctionArgumentList {
                            duplicate_treatment: None,
                            args: vec![
                                FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(
                                    Ident {
                                        value: "first_name".to_owned(),
                                        quote_style: None,
                                        span: Span::empty(),
                                    },
                                ))),
                                FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(
                                    Ident {
                                        value: "last_name".to_owned(),
                                        quote_style: None,
                                        span: Span::empty(),
                                    },
                                ))),
                            ],
                            clauses: vec![],
                        }),
                        filter: None,
                        null_treatment: None,
                        over: None,
                        within_group: vec![],
                    }),
                    options: OrderByOptions {
                        asc: None,
                        nulls_first: None,
                    },
                    with_fill: None,
                },
                operator_class: expected_operator_class.clone(),
            };

            match pg().verified_stmt(&single_column_sql_statement) {
                Statement::CreateIndex(CreateIndex {
                    name: Some(ObjectName(name)),
                    table_name: ObjectName(table_name),
                    using: Some(using),
                    columns,
                    unique: false,
                    concurrently: false,
                    if_not_exists: false,
                    include,
                    nulls_distinct: None,
                    with,
                    predicate: None,
                    index_options,
                    alter_options,
                }) => {
                    assert_eq_vec(&["the_index_name"], &name);
                    assert_eq_vec(&["users"], &table_name);
                    assert_eq!(expected_index_type, using);
                    assert_eq!(expected_function_column, columns[0],);
                    assert!(include.is_empty());
                    assert!(with.is_empty());
                    assert!(index_options.is_empty());
                    assert!(alter_options.is_empty());
                }
                _ => unreachable!(),
            }

            match pg().verified_stmt(&multi_column_sql_statement) {
                Statement::CreateIndex(CreateIndex {
                    name: Some(ObjectName(name)),
                    table_name: ObjectName(table_name),
                    using: Some(using),
                    columns,
                    unique: false,
                    concurrently: false,
                    if_not_exists: false,
                    include,
                    nulls_distinct: None,
                    with,
                    predicate: None,
                    index_options,
                    alter_options,
                }) => {
                    assert_eq_vec(&["the_index_name"], &name);
                    assert_eq_vec(&["users"], &table_name);
                    assert_eq!(expected_index_type, using);
                    assert_eq!(
                        IndexColumn {
                            column: OrderByExpr {
                                expr: Expr::Identifier(Ident {
                                    value: "column_name".to_owned(),
                                    quote_style: None,
                                    span: Span::empty()
                                }),
                                options: OrderByOptions {
                                    asc: None,
                                    nulls_first: None,
                                },
                                with_fill: None,
                            },
                            operator_class: None
                        },
                        columns[0],
                    );
                    assert_eq!(expected_function_column, columns[1],);
                    assert!(include.is_empty());
                    assert!(with.is_empty());
                    assert!(index_options.is_empty());
                    assert!(alter_options.is_empty());
                }
                _ => unreachable!(),
            }
        }
    }
}

#[test]
fn parse_create_bloom() {
    let sql =
        "CREATE INDEX bloomidx ON tbloom USING BLOOM (i1, i2, i3) WITH (length = 80, col1 = 2, col2 = 2, col3 = 4)";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using: Some(using),
            columns,
            unique: false,
            concurrently: false,
            if_not_exists: false,
            include,
            nulls_distinct: None,
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["bloomidx"], &name);
            assert_eq_vec(&["tbloom"], &table_name);
            assert_eq!(IndexType::Bloom, using);
            assert_eq_vec(&["i1", "i2", "i3"], &columns);
            assert!(include.is_empty());
            assert_eq!(
                vec![
                    Expr::BinaryOp {
                        left: Box::new(Expr::Identifier(Ident::new("length"))),
                        op: BinaryOperator::Eq,
                        right: Box::new(Expr::Value(number("80").into())),
                    },
                    Expr::BinaryOp {
                        left: Box::new(Expr::Identifier(Ident::new("col1"))),
                        op: BinaryOperator::Eq,
                        right: Box::new(Expr::Value(number("2").into())),
                    },
                    Expr::BinaryOp {
                        left: Box::new(Expr::Identifier(Ident::new("col2"))),
                        op: BinaryOperator::Eq,
                        right: Box::new(Expr::Value(number("2").into())),
                    },
                    Expr::BinaryOp {
                        left: Box::new(Expr::Identifier(Ident::new("col3"))),
                        op: BinaryOperator::Eq,
                        right: Box::new(Expr::Value(number("4").into())),
                    },
                ],
                with
            );
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_brin() {
    let sql = "CREATE INDEX brin_sensor_data_recorded_at ON sensor_data USING BRIN (recorded_at)";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using: Some(using),
            columns,
            unique: false,
            concurrently: false,
            if_not_exists: false,
            include,
            nulls_distinct: None,
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["brin_sensor_data_recorded_at"], &name);
            assert_eq_vec(&["sensor_data"], &table_name);
            assert_eq!(IndexType::BRIN, using);
            assert_eq_vec(&["recorded_at"], &columns);
            assert!(include.is_empty());
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_table_with_inherits() {
    let single_inheritance_sql =
        "CREATE TABLE child_table (child_column INT) INHERITS (public.parent_table)";
    match pg().verified_stmt(single_inheritance_sql) {
        Statement::CreateTable(CreateTable {
            inherits: Some(inherits),
            ..
        }) => {
            assert_eq_vec(&["public", "parent_table"], &inherits[0].0);
        }
        _ => unreachable!(),
    }

    let double_inheritance_sql = "CREATE TABLE child_table (child_column INT) INHERITS (public.parent_table, pg_catalog.pg_settings)";
    match pg().verified_stmt(double_inheritance_sql) {
        Statement::CreateTable(CreateTable {
            inherits: Some(inherits),
            ..
        }) => {
            assert_eq_vec(&["public", "parent_table"], &inherits[0].0);
            assert_eq_vec(&["pg_catalog", "pg_settings"], &inherits[1].0);
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_table_with_empty_inherits_fails() {
    assert!(matches!(
        pg().parse_sql_statements("CREATE TABLE child_table (child_column INT) INHERITS ()"),
        Err(ParserError::ParserError(_))
    ));
}

#[test]
fn parse_create_index_concurrently() {
    let sql = "CREATE INDEX CONCURRENTLY IF NOT EXISTS my_index ON my_table(col1, col2)";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            include,
            nulls_distinct: None,
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["my_index"], &name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(concurrently);
            assert!(if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert!(include.is_empty());
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_index_with_predicate() {
    let sql = "CREATE INDEX IF NOT EXISTS my_index ON my_table(col1, col2) WHERE col3 IS NULL";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            include,
            nulls_distinct: None,
            with,
            predicate: Some(_),
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["my_index"], &name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(!concurrently);
            assert!(if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert!(include.is_empty());
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_index_with_include() {
    let sql = "CREATE INDEX IF NOT EXISTS my_index ON my_table(col1, col2) INCLUDE (col3, col4)";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            include,
            nulls_distinct: None,
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["my_index"], &name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(!concurrently);
            assert!(if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert_eq_vec(&["col3", "col4"], &include);
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_index_with_nulls_distinct() {
    let sql = "CREATE INDEX IF NOT EXISTS my_index ON my_table(col1, col2) NULLS NOT DISTINCT";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            include,
            nulls_distinct: Some(nulls_distinct),
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["my_index"], &name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(!concurrently);
            assert!(if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert!(include.is_empty());
            assert!(!nulls_distinct);
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }

    let sql = "CREATE INDEX IF NOT EXISTS my_index ON my_table(col1, col2) NULLS DISTINCT";
    match pg().verified_stmt(sql) {
        Statement::CreateIndex(CreateIndex {
            name: Some(ObjectName(name)),
            table_name: ObjectName(table_name),
            using,
            columns,
            unique,
            concurrently,
            if_not_exists,
            include,
            nulls_distinct: Some(nulls_distinct),
            with,
            predicate: None,
            index_options,
            alter_options,
        }) => {
            assert_eq_vec(&["my_index"], &name);
            assert_eq_vec(&["my_table"], &table_name);
            assert_eq!(None, using);
            assert!(!unique);
            assert!(!concurrently);
            assert!(if_not_exists);
            assert_eq_vec(&["col1", "col2"], &columns);
            assert!(include.is_empty());
            assert!(nulls_distinct);
            assert!(with.is_empty());
            assert!(index_options.is_empty());
            assert!(alter_options.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_array_subquery_expr() {
    let sql = "SELECT ARRAY(SELECT 1 UNION SELECT 2)";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        &Expr::Function(Function {
            name: ObjectName::from(vec![Ident::new("ARRAY")]),
            uses_odbc_syntax: false,
            parameters: FunctionArguments::None,
            args: FunctionArguments::Subquery(Box::new(Query {
                with: None,
                body: Box::new(SetExpr::SetOperation {
                    op: SetOperator::Union,
                    set_quantifier: SetQuantifier::None,
                    left: Box::new(SetExpr::Select(Box::new(Select {
                        select_token: AttachedToken::empty(),
                        distinct: None,
                        top: None,
                        top_before_distinct: false,
                        projection: vec![SelectItem::UnnamedExpr(Expr::Value(
                            (number("1")).with_empty_span()
                        ))],
                        exclude: None,
                        into: None,
                        from: vec![],
                        lateral_views: vec![],
                        prewhere: None,
                        selection: None,
                        group_by: GroupByExpr::Expressions(vec![], vec![]),
                        cluster_by: vec![],
                        distribute_by: vec![],
                        sort_by: vec![],
                        having: None,
                        named_window: vec![],
                        qualify: None,
                        window_before_qualify: false,
                        value_table_mode: None,
                        connect_by: None,
                        flavor: SelectFlavor::Standard,
                    }))),
                    right: Box::new(SetExpr::Select(Box::new(Select {
                        select_token: AttachedToken::empty(),
                        distinct: None,
                        top: None,
                        top_before_distinct: false,
                        projection: vec![SelectItem::UnnamedExpr(Expr::Value(
                            (number("2")).with_empty_span()
                        ))],
                        exclude: None,
                        into: None,
                        from: vec![],
                        lateral_views: vec![],
                        prewhere: None,
                        selection: None,
                        group_by: GroupByExpr::Expressions(vec![], vec![]),
                        cluster_by: vec![],
                        distribute_by: vec![],
                        sort_by: vec![],
                        having: None,
                        named_window: vec![],
                        qualify: None,
                        window_before_qualify: false,
                        value_table_mode: None,
                        connect_by: None,
                        flavor: SelectFlavor::Standard,
                    }))),
                }),
                order_by: None,
                limit_clause: None,
                fetch: None,
                locks: vec![],
                for_clause: None,
                settings: None,
                format_clause: None,
                pipe_operators: vec![],
            })),
            filter: None,
            null_treatment: None,
            over: None,
            within_group: vec![]
        }),
        expr_from_projection(only(&select.projection)),
    );
}

#[test]
fn test_transaction_statement() {
    let statement = pg().verified_stmt("SET TRANSACTION SNAPSHOT '000003A1-1'");
    assert_eq!(
        statement,
        Statement::Set(Set::SetTransaction {
            modes: vec![],
            snapshot: Some(Value::SingleQuotedString(String::from("000003A1-1"))),
            session: false
        })
    );
    let statement = pg().verified_stmt("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY, READ WRITE, ISOLATION LEVEL SERIALIZABLE");
    assert_eq!(
        statement,
        Statement::Set(Set::SetTransaction {
            modes: vec![
                TransactionMode::AccessMode(TransactionAccessMode::ReadOnly),
                TransactionMode::AccessMode(TransactionAccessMode::ReadWrite),
                TransactionMode::IsolationLevel(TransactionIsolationLevel::Serializable),
            ],
            snapshot: None,
            session: true
        })
    );
}

#[test]
fn test_json() {
    let sql = "SELECT params ->> 'name' FROM events";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("params"))),
            op: BinaryOperator::LongArrow,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("name".to_string())).with_empty_span()
            )),
        }),
        select.projection[0]
    );

    let sql = "SELECT params -> 'name' FROM events";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("params"))),
            op: BinaryOperator::Arrow,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("name".to_string())).with_empty_span()
            )),
        }),
        select.projection[0]
    );

    let sql = "SELECT info -> 'items' ->> 'product' FROM orders";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident::new("info"))),
                op: BinaryOperator::Arrow,
                right: Box::new(Expr::Value(
                    (Value::SingleQuotedString("items".to_string())).with_empty_span()
                ))
            }),
            op: BinaryOperator::LongArrow,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("product".to_string())).with_empty_span()
            )),
        }),
        select.projection[0]
    );

    // the RHS can be a number (array element access)
    let sql = "SELECT obj -> 42";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("obj"))),
            op: BinaryOperator::Arrow,
            right: Box::new(Expr::value(number("42"))),
        }),
        select.projection[0]
    );

    // the RHS can be an identifier
    let sql = "SELECT obj -> key";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("obj"))),
            op: BinaryOperator::Arrow,
            right: Box::new(Expr::Identifier(Ident::new("key"))),
        }),
        select.projection[0]
    );

    // -> operator has lower precedence than arithmetic ops
    let sql = "SELECT obj -> 3 * 2";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("obj"))),
            op: BinaryOperator::Arrow,
            right: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::value(number("3"))),
                op: BinaryOperator::Multiply,
                right: Box::new(Expr::value(number("2"))),
            }),
        }),
        select.projection[0]
    );

    let sql = "SELECT info #> '{a,b,c}' FROM orders";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("info"))),
            op: BinaryOperator::HashArrow,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("{a,b,c}".to_string())).with_empty_span()
            )),
        }),
        select.projection[0]
    );

    let sql = "SELECT info #>> '{a,b,c}' FROM orders";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("info"))),
            op: BinaryOperator::HashLongArrow,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("{a,b,c}".to_string())).with_empty_span()
            )),
        }),
        select.projection[0]
    );

    let sql = "SELECT info FROM orders WHERE info @> '{\"a\": 1}'";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("info"))),
            op: BinaryOperator::AtArrow,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("{\"a\": 1}".to_string())).with_empty_span()
            )),
        },
        select.selection.unwrap(),
    );

    let sql = "SELECT info FROM orders WHERE '{\"a\": 1}' <@ info";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Value(
                (Value::SingleQuotedString("{\"a\": 1}".to_string())).with_empty_span()
            )),
            op: BinaryOperator::ArrowAt,
            right: Box::new(Expr::Identifier(Ident::new("info"))),
        },
        select.selection.unwrap(),
    );

    let sql = "SELECT info #- ARRAY['a', 'b'] FROM orders";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        SelectItem::UnnamedExpr(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::from("info"))),
            op: BinaryOperator::HashMinus,
            right: Box::new(Expr::Array(Array {
                elem: vec![
                    Expr::Value((Value::SingleQuotedString("a".to_string())).with_empty_span()),
                    Expr::Value((Value::SingleQuotedString("b".to_string())).with_empty_span()),
                ],
                named: true,
            })),
        }),
        select.projection[0],
    );

    let sql = "SELECT info FROM orders WHERE info @? '$.a'";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::from("info"))),
            op: BinaryOperator::AtQuestion,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("$.a".to_string())).with_empty_span()
            ),),
        },
        select.selection.unwrap(),
    );

    let sql = "SELECT info FROM orders WHERE info @@ '$.a'";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::from("info"))),
            op: BinaryOperator::AtAt,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("$.a".to_string())).with_empty_span()
            ),),
        },
        select.selection.unwrap(),
    );

    let sql = r#"SELECT info FROM orders WHERE info ? 'b'"#;
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("info"))),
            op: BinaryOperator::Question,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("b".to_string())).with_empty_span()
            )),
        },
        select.selection.unwrap(),
    );

    let sql = r#"SELECT info FROM orders WHERE info ?& ARRAY['b', 'c']"#;
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("info"))),
            op: BinaryOperator::QuestionAnd,
            right: Box::new(Expr::Array(Array {
                elem: vec![
                    Expr::Value((Value::SingleQuotedString("b".to_string())).with_empty_span()),
                    Expr::Value((Value::SingleQuotedString("c".to_string())).with_empty_span())
                ],
                named: true
            }))
        },
        select.selection.unwrap(),
    );

    let sql = r#"SELECT info FROM orders WHERE info ?| ARRAY['b', 'c']"#;
    let select = pg().verified_only_select(sql);
    assert_eq!(
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident::new("info"))),
            op: BinaryOperator::QuestionPipe,
            right: Box::new(Expr::Array(Array {
                elem: vec![
                    Expr::Value((Value::SingleQuotedString("b".to_string())).with_empty_span()),
                    Expr::Value((Value::SingleQuotedString("c".to_string())).with_empty_span())
                ],
                named: true
            }))
        },
        select.selection.unwrap(),
    );
}

#[test]
fn json_object_colon_syntax() {
    match pg().verified_expr("JSON_OBJECT('name' : 'value')") {
        Expr::Function(Function {
            args: FunctionArguments::List(FunctionArgumentList { args, .. }),
            ..
        }) => {
            assert!(
                matches!(
                    &args[..],
                    &[FunctionArg::ExprNamed {
                        operator: FunctionArgOperator::Colon,
                        ..
                    }]
                ),
                "Invalid function argument: {args:?}"
            );
        }
        other => panic!(
            "Expected: JSON_OBJECT('name' : 'value') to be parsed as a function, but got {other:?}"
        ),
    }
}

#[test]
fn json_object_value_syntax() {
    match pg().verified_expr("JSON_OBJECT('name' VALUE 'value')") {
        Expr::Function(Function { args: FunctionArguments::List(FunctionArgumentList { args, .. }), .. }) => {
            assert!(matches!(
                &args[..],
                &[FunctionArg::ExprNamed { operator: FunctionArgOperator::Value, .. }]
            ), "Invalid function argument: {args:?}");
        }
        other => panic!("Expected: JSON_OBJECT('name' VALUE 'value') to be parsed as a function, but got {other:?}"),
    }
}

#[test]
fn parse_json_object() {
    let sql = "JSON_OBJECT('name' VALUE 'value' NULL ON NULL)";
    let expr = pg().verified_expr(sql);
    assert!(
        matches!(
            expr.clone(),
            Expr::Function(Function {
                name: ObjectName(parts),
                args: FunctionArguments::List(FunctionArgumentList { args, clauses, .. }),
                ..
            }) if parts == vec![ObjectNamePart::Identifier(Ident::new("JSON_OBJECT"))]
                && matches!(
                    &args[..],
                    &[FunctionArg::ExprNamed { operator: FunctionArgOperator::Value, .. }]
                )
                && clauses == vec![FunctionArgumentClause::JsonNullClause(JsonNullClause::NullOnNull)]
        ),
        "Failed to parse JSON_OBJECT with expected structure, got: {expr:?}"
    );

    let sql = "JSON_OBJECT('name' VALUE 'value' RETURNING JSONB)";
    let expr = pg().verified_expr(sql);
    assert!(
        matches!(
            expr.clone(),
            Expr::Function(Function {
                name: ObjectName(parts),
                args: FunctionArguments::List(FunctionArgumentList { args, clauses, .. }),
                ..
            }) if parts == vec![ObjectNamePart::Identifier(Ident::new("JSON_OBJECT"))]
                && matches!(
                    &args[..],
                    &[FunctionArg::ExprNamed { operator: FunctionArgOperator::Value, .. }]
                )
                && clauses == vec![FunctionArgumentClause::JsonReturningClause(JsonReturningClause { data_type: DataType::JSONB })]
        ),
        "Failed to parse JSON_OBJECT with expected structure, got: {expr:?}"
    );

    let sql = "JSON_OBJECT(RETURNING JSONB)";
    let expr = pg().verified_expr(sql);
    assert!(
        matches!(
            expr.clone(),
            Expr::Function(Function {
                name: ObjectName(parts),
                args: FunctionArguments::List(FunctionArgumentList { args, clauses, .. }),
                ..
            }) if parts == vec![ObjectNamePart::Identifier(Ident::new("JSON_OBJECT"))]
                && args.is_empty()
                && clauses == vec![FunctionArgumentClause::JsonReturningClause(JsonReturningClause { data_type: DataType::JSONB })]
        ),
        "Failed to parse JSON_OBJECT with expected structure, got: {expr:?}"
    );
}

#[test]
fn parse_json_table_is_not_reserved() {
    // JSON_TABLE is not a reserved keyword in PostgreSQL, even though it is in SQL:2023
    // see: https://en.wikipedia.org/wiki/List_of_SQL_reserved_words
    let Select { from, .. } = pg_and_generic().verified_only_select("SELECT * FROM JSON_TABLE");
    assert_eq!(1, from.len());
    match &from[0].relation {
        TableFactor::Table {
            name: ObjectName(name),
            ..
        } => assert_eq!(
            ObjectNamePart::Identifier(Ident::new("JSON_TABLE")),
            name[0]
        ),
        other => panic!("Expected: JSON_TABLE to be parsed as a table name, but got {other:?}"),
    }
}

#[test]
fn test_composite_value() {
    let sql = "SELECT (on_hand.item).name FROM on_hand WHERE (on_hand.item).price > 9";
    let select = pg().verified_only_select(sql);

    let Expr::CompoundFieldAccess { root, access_chain } =
        expr_from_projection(&select.projection[0])
    else {
        unreachable!("expected projection: got {:?}", &select.projection[0]);
    };
    assert_eq!(
        root.as_ref(),
        &Expr::Nested(Box::new(Expr::CompoundIdentifier(vec![
            Ident::new("on_hand"),
            Ident::new("item")
        ])))
    );
    assert_eq!(
        access_chain.as_slice(),
        &[AccessExpr::Dot(Expr::Identifier(Ident::new("name")))]
    );

    assert_eq!(
        select.selection.as_ref().unwrap(),
        &Expr::BinaryOp {
            left: Box::new(Expr::CompoundFieldAccess {
                root: Expr::Nested(Box::new(Expr::CompoundIdentifier(vec![
                    Ident::new("on_hand"),
                    Ident::new("item")
                ])))
                .into(),
                access_chain: vec![AccessExpr::Dot(Expr::Identifier(Ident::new("price")))]
            }),
            op: BinaryOperator::Gt,
            right: Box::new(Expr::value(number("9")))
        }
    );

    let sql = "SELECT (information_schema._pg_expandarray(ARRAY['i', 'i'])).n";
    let select = pg().verified_only_select(sql);
    assert_eq!(
        &Expr::CompoundFieldAccess {
            root: Box::new(Expr::Nested(Box::new(Expr::Function(Function {
                name: ObjectName::from(vec![
                    Ident::new("information_schema"),
                    Ident::new("_pg_expandarray")
                ]),
                uses_odbc_syntax: false,
                parameters: FunctionArguments::None,
                args: FunctionArguments::List(FunctionArgumentList {
                    duplicate_treatment: None,
                    args: vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Array(
                        Array {
                            elem: vec![
                                Expr::Value(
                                    (Value::SingleQuotedString("i".to_string())).with_empty_span()
                                ),
                                Expr::Value(
                                    (Value::SingleQuotedString("i".to_string())).with_empty_span()
                                ),
                            ],
                            named: true
                        }
                    )))],
                    clauses: vec![],
                }),
                null_treatment: None,
                filter: None,
                over: None,
                within_group: vec![],
            })))),
            access_chain: vec![AccessExpr::Dot(Expr::Identifier(Ident::new("n")))],
        },
        expr_from_projection(&select.projection[0])
    );
}

#[test]
fn parse_quoted_identifier() {
    pg_and_generic().verified_stmt(r#"SELECT "quoted "" ident""#);
}

#[test]
fn parse_quoted_identifier_2() {
    pg_and_generic().verified_stmt(r#"SELECT """quoted ident""""#);
}

#[test]
fn parse_local_and_global() {
    pg_and_generic().verified_stmt("CREATE LOCAL TEMPORARY TABLE table (COL INT)");
}

#[test]
fn parse_on_commit() {
    pg_and_generic()
        .verified_stmt("CREATE TEMPORARY TABLE table (COL INT) ON COMMIT PRESERVE ROWS");

    pg_and_generic().verified_stmt("CREATE TEMPORARY TABLE table (COL INT) ON COMMIT DELETE ROWS");

    pg_and_generic().verified_stmt("CREATE TEMPORARY TABLE table (COL INT) ON COMMIT DROP");
}

fn pg() -> TestedDialects {
    TestedDialects::new(vec![Box::new(PostgreSqlDialect {})])
}

fn pg_and_generic() -> TestedDialects {
    TestedDialects::new(vec![
        Box::new(PostgreSqlDialect {}),
        Box::new(GenericDialect {}),
    ])
}

#[test]
fn parse_escaped_literal_string() {
    let sql = r"SELECT E's1 \n s1', E's2 \\n s2', E's3 \\\n s3', E's4 \\\\n s4', E'\'', E'foo \\'";
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(6, select.projection.len());
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("s1 \n s1".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[0])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("s2 \\n s2".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[1])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("s3 \\\n s3".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[2])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("s4 \\\\n s4".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[3])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("'".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[4])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("foo \\".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[5])
    );

    let sql = r"SELECT E'\'";
    assert_eq!(
        pg_and_generic()
            .parse_sql_statements(sql)
            .unwrap_err()
            .to_string(),
        "sql parser error: Unterminated encoded string literal at Line: 1, Column: 8"
    );

    let sql = r"SELECT E'\u0001', E'\U0010FFFF', E'\xC', E'\x25', E'\2', E'\45', E'\445'";
    let canonical = "";
    let select = pg_and_generic().verified_only_select_with_canonical(sql, canonical);
    assert_eq!(7, select.projection.len());
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("\u{0001}".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[0])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("\u{10ffff}".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[1])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("\u{000c}".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[2])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("%".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[3])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("\u{0002}".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[4])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("%".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[5])
    );
    assert_eq!(
        &Expr::Value((Value::EscapedStringLiteral("%".to_string())).with_empty_span()),
        expr_from_projection(&select.projection[6])
    );

    fn negative_cast(sqls: &[&str]) {
        for sql in sqls {
            assert_eq!(
                pg_and_generic()
                    .parse_sql_statements(sql)
                    .unwrap_err()
                    .to_string(),
                "sql parser error: Unterminated encoded string literal at Line: 1, Column: 8"
            );
        }
    }

    negative_cast(&[
        r"SELECT E'\u0000'",
        r"SELECT E'\U00110000'",
        r"SELECT E'\u{0001}'",
        r"SELECT E'\xCAD'",
        r"SELECT E'\080'",
    ]);
}

#[test]
fn parse_declare() {
    pg_and_generic()
        .verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" CURSOR WITH HOLD FOR SELECT 1");
    pg_and_generic()
        .verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" CURSOR WITHOUT HOLD FOR SELECT 1");
    pg_and_generic().verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" BINARY CURSOR FOR SELECT 1");
    pg_and_generic()
        .verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" ASENSITIVE CURSOR FOR SELECT 1");
    pg_and_generic()
        .verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" INSENSITIVE CURSOR FOR SELECT 1");
    pg_and_generic().verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" SCROLL CURSOR FOR SELECT 1");
    pg_and_generic()
        .verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" NO SCROLL CURSOR FOR SELECT 1");
    pg_and_generic().verified_stmt("DECLARE \"SQL_CUR0x7fa44801bc00\" BINARY INSENSITIVE SCROLL CURSOR WITH HOLD FOR SELECT * FROM table_name LIMIT 2222");
}

#[test]
fn parse_current_functions() {
    let sql = "SELECT CURRENT_CATALOG, CURRENT_USER, SESSION_USER, USER";
    let select = pg_and_generic().verified_only_select(sql);
    assert_eq!(
        &Expr::Function(Function {
            name: ObjectName::from(vec![Ident::new("CURRENT_CATALOG")]),
            uses_odbc_syntax: false,
            parameters: FunctionArguments::None,
            args: FunctionArguments::None,
            null_treatment: None,
            filter: None,
            over: None,
            within_group: vec![],
        }),
        expr_from_projection(&select.projection[0])
    );
    assert_eq!(
        &Expr::Function(Function {
            name: ObjectName::from(vec![Ident::new("CURRENT_USER")]),
            uses_odbc_syntax: false,
            parameters: FunctionArguments::None,
            args: FunctionArguments::None,
            null_treatment: None,
            filter: None,
            over: None,
            within_group: vec![],
        }),
        expr_from_projection(&select.projection[1])
    );
    assert_eq!(
        &Expr::Function(Function {
            name: ObjectName::from(vec![Ident::new("SESSION_USER")]),
            uses_odbc_syntax: false,
            parameters: FunctionArguments::None,
            args: FunctionArguments::None,
            null_treatment: None,
            filter: None,
            over: None,
            within_group: vec![],
        }),
        expr_from_projection(&select.projection[2])
    );
    assert_eq!(
        &Expr::Function(Function {
            name: ObjectName::from(vec![Ident::new("USER")]),
            uses_odbc_syntax: false,
            parameters: FunctionArguments::None,
            args: FunctionArguments::None,
            null_treatment: None,
            filter: None,
            over: None,
            within_group: vec![],
        }),
        expr_from_projection(&select.projection[3])
    );
}

#[test]
fn parse_fetch() {
    pg_and_generic().verified_stmt("FETCH 2048 IN \"SQL_CUR0x7fa44801bc00\"");
    pg_and_generic().verified_stmt("FETCH 2048 IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic().verified_stmt("FETCH NEXT IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic().verified_stmt("FETCH PRIOR IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic().verified_stmt("FETCH FIRST IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic().verified_stmt("FETCH LAST IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic()
        .verified_stmt("FETCH ABSOLUTE 2048 IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic()
        .verified_stmt("FETCH RELATIVE 2048 IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic().verified_stmt("FETCH ALL IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic().verified_stmt("FETCH ALL IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic()
        .verified_stmt("FETCH FORWARD 2048 IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic()
        .verified_stmt("FETCH FORWARD ALL IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic()
        .verified_stmt("FETCH BACKWARD 2048 IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
    pg_and_generic()
        .verified_stmt("FETCH BACKWARD ALL IN \"SQL_CUR0x7fa44801bc00\" INTO \"new_table\"");
}

#[test]
fn parse_custom_operator() {
    // operator with a database and schema
    let sql = r#"SELECT * FROM events WHERE relname OPERATOR(database.pg_catalog.~) '^(table)$'"#;
    let select = pg().verified_only_select(sql);
    assert_eq!(
        select.selection,
        Some(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident {
                value: "relname".into(),
                quote_style: None,
                span: Span::empty(),
            })),
            op: BinaryOperator::PGCustomBinaryOperator(vec![
                "database".into(),
                "pg_catalog".into(),
                "~".into()
            ]),
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("^(table)$".into())).with_empty_span()
            ))
        })
    );

    // operator with a schema
    let sql = r#"SELECT * FROM events WHERE relname OPERATOR(pg_catalog.~) '^(table)$'"#;
    let select = pg().verified_only_select(sql);
    assert_eq!(
        select.selection,
        Some(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident {
                value: "relname".into(),
                quote_style: None,
                span: Span::empty(),
            })),
            op: BinaryOperator::PGCustomBinaryOperator(vec!["pg_catalog".into(), "~".into()]),
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("^(table)$".into())).with_empty_span()
            ))
        })
    );

    // custom operator without a schema
    let sql = r#"SELECT * FROM events WHERE relname OPERATOR(~) '^(table)$'"#;
    let select = pg().verified_only_select(sql);
    assert_eq!(
        select.selection,
        Some(Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident {
                value: "relname".into(),
                quote_style: None,
                span: Span::empty(),
            })),
            op: BinaryOperator::PGCustomBinaryOperator(vec!["~".into()]),
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("^(table)$".into())).with_empty_span()
            ))
        })
    );
}

#[test]
fn parse_create_role() {
    let sql = "CREATE ROLE IF NOT EXISTS mysql_a, mysql_b";
    match pg().verified_stmt(sql) {
        Statement::CreateRole(create_role) => {
            assert_eq_vec(&["mysql_a", "mysql_b"], &create_role.names);
            assert!(create_role.if_not_exists);
        }
        _ => unreachable!(),
    }

    let sql = "CREATE ROLE abc LOGIN PASSWORD NULL";
    match pg().parse_sql_statements(sql).as_deref() {
        Ok([Statement::CreateRole(create_role)]) => {
            assert_eq_vec(&["abc"], &create_role.names);
            assert_eq!(create_role.login, Some(true));
            assert_eq!(create_role.password, Some(Password::NullPassword));
        }
        err => panic!("Failed to parse CREATE ROLE test case: {err:?}"),
    }

    let sql = "CREATE ROLE abc WITH LOGIN PASSWORD NULL";
    match pg().parse_sql_statements(sql).as_deref() {
        Ok([Statement::CreateRole(create_role)]) => {
            assert_eq_vec(&["abc"], &create_role.names);
            assert_eq!(create_role.login, Some(true));
            assert_eq!(create_role.password, Some(Password::NullPassword));
        }
        err => panic!("Failed to parse CREATE ROLE test case: {err:?}"),
    }

    let sql = "CREATE ROLE magician WITH SUPERUSER CREATEROLE NOCREATEDB BYPASSRLS INHERIT PASSWORD 'abcdef' LOGIN VALID UNTIL '2025-01-01' IN ROLE role1, role2 ROLE role3 ADMIN role4, role5 REPLICATION";
    // Roundtrip order of optional parameters is not preserved
    match pg().parse_sql_statements(sql).as_deref() {
        Ok([Statement::CreateRole(create_role)]) => {
            assert_eq_vec(&["magician"], &create_role.names);
            assert!(!create_role.if_not_exists);
            assert_eq!(create_role.login, Some(true));
            assert_eq!(create_role.inherit, Some(true));
            assert_eq!(create_role.bypassrls, Some(true));
            assert_eq!(
                create_role.password,
                Some(Password::Password(Expr::Value(
                    (Value::SingleQuotedString("abcdef".into())).with_empty_span()
                )))
            );
            assert_eq!(create_role.superuser, Some(true));
            assert_eq!(create_role.create_db, Some(false));
            assert_eq!(create_role.create_role, Some(true));
            assert_eq!(create_role.replication, Some(true));
            assert_eq!(create_role.connection_limit, None);
            assert_eq!(
                create_role.valid_until,
                Some(Expr::Value(
                    (Value::SingleQuotedString("2025-01-01".into())).with_empty_span()
                ))
            );
            assert_eq_vec(&["role1", "role2"], &create_role.in_role);
            assert!(create_role.in_group.is_empty());
            assert_eq_vec(&["role3"], &create_role.role);
            assert_eq_vec(&["role4", "role5"], &create_role.admin);
            assert_eq!(create_role.authorization_owner, None);
        }
        err => panic!("Failed to parse CREATE ROLE test case: {err:?}"),
    }

    let sql = "CREATE ROLE abc WITH USER foo, bar ROLE baz ";
    match pg().parse_sql_statements(sql).as_deref() {
        Ok([Statement::CreateRole(create_role)]) => {
            assert_eq_vec(&["abc"], &create_role.names);
            assert_eq_vec(&["foo", "bar"], &create_role.user);
            assert_eq_vec(&["baz"], &create_role.role);
        }
        err => panic!("Failed to parse CREATE ROLE test case: {err:?}"),
    }

    let negatables = [
        "BYPASSRLS",
        "CREATEDB",
        "CREATEROLE",
        "INHERIT",
        "LOGIN",
        "REPLICATION",
        "SUPERUSER",
    ];

    for negatable_kw in negatables.iter() {
        let sql = format!("CREATE ROLE abc {negatable_kw} NO{negatable_kw}");
        if pg().parse_sql_statements(&sql).is_ok() {
            panic!("Should not be able to parse CREATE ROLE containing both negated and non-negated versions of the same keyword: {negatable_kw}")
        }
    }
}

#[test]
fn parse_alter_role() {
    let sql = "ALTER ROLE old_name RENAME TO new_name";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "old_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::RenameRole {
                role_name: Ident {
                    value: "new_name".into(),
                    quote_style: None,
                    span: Span::empty(),
                }
            },
        }
    );

    let sql = "ALTER ROLE role_name WITH SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 100 PASSWORD 'abcdef' VALID UNTIL '2025-01-01'";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::WithOptions {
                options: vec![
                    RoleOption::SuperUser(true),
                    RoleOption::CreateDB(true),
                    RoleOption::CreateRole(true),
                    RoleOption::Inherit(true),
                    RoleOption::Login(true),
                    RoleOption::Replication(true),
                    RoleOption::BypassRLS(true),
                    RoleOption::ConnectionLimit(Expr::value(number("100"))),
                    RoleOption::Password({
                        Password::Password(Expr::Value(
                            (Value::SingleQuotedString("abcdef".into())).with_empty_span(),
                        ))
                    }),
                    RoleOption::ValidUntil(Expr::Value(
                        (Value::SingleQuotedString("2025-01-01".into(),)).with_empty_span()
                    ))
                ]
            },
        }
    );

    let sql = "ALTER ROLE role_name WITH NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOLOGIN NOREPLICATION NOBYPASSRLS PASSWORD NULL";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::WithOptions {
                options: vec![
                    RoleOption::SuperUser(false),
                    RoleOption::CreateDB(false),
                    RoleOption::CreateRole(false),
                    RoleOption::Inherit(false),
                    RoleOption::Login(false),
                    RoleOption::Replication(false),
                    RoleOption::BypassRLS(false),
                    RoleOption::Password(Password::NullPassword),
                ]
            },
        }
    );

    let sql = "ALTER ROLE role_name SET maintenance_work_mem FROM CURRENT";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::Set {
                config_name: ObjectName::from(vec![Ident {
                    value: "maintenance_work_mem".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                config_value: SetConfigValue::FromCurrent,
                in_database: None
            },
        }
    );

    let sql = "ALTER ROLE role_name IN DATABASE database_name SET maintenance_work_mem = 100000";
    assert_eq!(
        pg().parse_sql_statements(sql).unwrap(),
        [Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::Set {
                config_name: ObjectName::from(vec![Ident {
                    value: "maintenance_work_mem".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                config_value: SetConfigValue::Value(Expr::Value(
                    (number("100000")).with_empty_span()
                )),
                in_database: Some(ObjectName::from(vec![Ident {
                    value: "database_name".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]))
            },
        }]
    );

    let sql = "ALTER ROLE role_name IN DATABASE database_name SET maintenance_work_mem TO 100000";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::Set {
                config_name: ObjectName::from(vec![Ident {
                    value: "maintenance_work_mem".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                config_value: SetConfigValue::Value(Expr::Value(
                    (number("100000")).with_empty_span()
                )),
                in_database: Some(ObjectName::from(vec![Ident {
                    value: "database_name".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]))
            },
        }
    );

    let sql = "ALTER ROLE role_name IN DATABASE database_name SET maintenance_work_mem TO DEFAULT";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::Set {
                config_name: ObjectName::from(vec![Ident {
                    value: "maintenance_work_mem".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                config_value: SetConfigValue::Default,
                in_database: Some(ObjectName::from(vec![Ident {
                    value: "database_name".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]))
            },
        }
    );

    let sql = "ALTER ROLE role_name RESET ALL";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::Reset {
                config_name: ResetConfig::ALL,
                in_database: None
            },
        }
    );

    let sql = "ALTER ROLE role_name IN DATABASE database_name RESET maintenance_work_mem";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::AlterRole {
            name: Ident {
                value: "role_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
            operation: AlterRoleOperation::Reset {
                config_name: ResetConfig::ConfigName(ObjectName::from(vec![Ident {
                    value: "maintenance_work_mem".into(),
                    quote_style: None,
                    span: Span::empty(),
                }])),
                in_database: Some(ObjectName::from(vec![Ident {
                    value: "database_name".into(),
                    quote_style: None,
                    span: Span::empty(),
                }]))
            },
        }
    );
}

#[test]
fn parse_delimited_identifiers() {
    // check that quoted identifiers in any position remain quoted after serialization
    let select = pg().verified_only_select(
        r#"SELECT "alias"."bar baz", "myfun"(), "simple id" AS "column alias" FROM "a table" AS "alias""#,
    );
    // check FROM
    match only(select.from).relation {
        TableFactor::Table {
            name,
            alias,
            args,
            with_hints,
            version,
            ..
        } => {
            assert_eq!(
                ObjectName::from(vec![Ident::with_quote('"', "a table")]),
                name
            );
            assert_eq!(Ident::with_quote('"', "alias"), alias.unwrap().name);
            assert!(args.is_none());
            assert!(with_hints.is_empty());
            assert!(version.is_none());
        }
        _ => panic!("Expecting TableFactor::Table"),
    }
    // check SELECT
    assert_eq!(3, select.projection.len());
    assert_eq!(
        &Expr::CompoundIdentifier(vec![
            Ident::with_quote('"', "alias"),
            Ident::with_quote('"', "bar baz"),
        ]),
        expr_from_projection(&select.projection[0]),
    );
    assert_eq!(
        &Expr::Function(Function {
            name: ObjectName::from(vec![Ident::with_quote('"', "myfun")]),
            uses_odbc_syntax: false,
            parameters: FunctionArguments::None,
            args: FunctionArguments::List(FunctionArgumentList {
                duplicate_treatment: None,
                args: vec![],
                clauses: vec![],
            }),
            null_treatment: None,
            filter: None,
            over: None,
            within_group: vec![],
        }),
        expr_from_projection(&select.projection[1]),
    );
    match &select.projection[2] {
        SelectItem::ExprWithAlias { expr, alias } => {
            assert_eq!(&Expr::Identifier(Ident::with_quote('"', "simple id")), expr);
            assert_eq!(&Ident::with_quote('"', "column alias"), alias);
        }
        _ => panic!("Expected: ExprWithAlias"),
    }

    pg().verified_stmt(r#"CREATE TABLE "foo" ("bar" "int")"#);
    pg().verified_stmt(r#"ALTER TABLE foo ADD CONSTRAINT "bar" PRIMARY KEY (baz)"#);
    pg().verified_stmt(r#"UPDATE foo SET "bar" = 5"#);
}

#[test]
fn parse_update_has_keyword() {
    pg().one_statement_parses_to(
        r#"UPDATE test SET name=$1,
                value=$2,
                where=$3,
                create=$4,
                is_default=$5,
                classification=$6,
                sort=$7
                WHERE id=$8"#,
        r#"UPDATE test SET name = $1, value = $2, where = $3, create = $4, is_default = $5, classification = $6, sort = $7 WHERE id = $8"#
    );
}

#[test]
fn parse_update_in_with_subquery() {
    pg_and_generic().verified_stmt(r#"WITH "result" AS (UPDATE "Hero" SET "name" = 'Captain America', "number_of_movies" = "number_of_movies" + 1 WHERE "secret_identity" = 'Sam Wilson' RETURNING "id", "name", "secret_identity", "number_of_movies") SELECT * FROM "result""#);
}

#[test]
fn parser_create_function_with_args() {
    let sql1 = r#"CREATE OR REPLACE FUNCTION check_strings_different(str1 VARCHAR, str2 VARCHAR) RETURNS BOOLEAN LANGUAGE plpgsql AS $$
BEGIN
    IF str1 <> str2 THEN
        RETURN TRUE;
    ELSE
        RETURN FALSE;
    END IF;
END;
$$"#;

    assert_eq!(
        pg_and_generic().verified_stmt(sql1),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: true,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("check_strings_different")]),
            args: Some(vec![
                OperateFunctionArg::with_name(
                    "str1",
                    DataType::Varchar(None),
                ),
                OperateFunctionArg::with_name(
                    "str2",
                    DataType::Varchar(None),
                ),
            ]),
            return_type: Some(DataType::Boolean),
            language: Some("plpgsql".into()),
            behavior: None,
            called_on_null: None,
            parallel: None,
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::DollarQuotedString(DollarQuotedString {value: "\nBEGIN\n    IF str1 <> str2 THEN\n        RETURN TRUE;\n    ELSE\n        RETURN FALSE;\n    END IF;\nEND;\n".to_owned(), tag: None})).with_empty_span()
                ),
                link_symbol: None,
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );

    let sql2 = r#"CREATE OR REPLACE FUNCTION check_not_zero(int1 INT) RETURNS BOOLEAN LANGUAGE plpgsql AS $$
BEGIN
    IF int1 <> 0 THEN
        RETURN TRUE;
    ELSE
        RETURN FALSE;
    END IF;
END;
$$"#;
    assert_eq!(
        pg_and_generic().verified_stmt(sql2),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: true,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("check_not_zero")]),
            args: Some(vec![
                OperateFunctionArg::with_name(
                    "int1",
                    DataType::Int(None)
                )
            ]),
            return_type: Some(DataType::Boolean),
            language: Some("plpgsql".into()),
            behavior: None,
            called_on_null: None,
            parallel: None,
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::DollarQuotedString(DollarQuotedString {value: "\nBEGIN\n    IF int1 <> 0 THEN\n        RETURN TRUE;\n    ELSE\n        RETURN FALSE;\n    END IF;\nEND;\n".to_owned(), tag: None})).with_empty_span()
                ),
                link_symbol: None,
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );

    let sql3 = r#"CREATE OR REPLACE FUNCTION check_values_different(a INT, b INT) RETURNS BOOLEAN LANGUAGE plpgsql AS $$
BEGIN
    IF a <> b THEN
        RETURN TRUE;
    ELSE
        RETURN FALSE;
    END IF;
END;
$$"#;
    assert_eq!(
        pg_and_generic().verified_stmt(sql3),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: true,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("check_values_different")]),
            args: Some(vec![
                OperateFunctionArg::with_name(
                    "a",
                    DataType::Int(None)
                ),
                OperateFunctionArg::with_name(
                    "b",
                    DataType::Int(None)
                ),
            ]),
            return_type: Some(DataType::Boolean),
            language: Some("plpgsql".into()),
            behavior: None,
            called_on_null: None,
            parallel: None,
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::DollarQuotedString(DollarQuotedString {value: "\nBEGIN\n    IF a <> b THEN\n        RETURN TRUE;\n    ELSE\n        RETURN FALSE;\n    END IF;\nEND;\n".to_owned(), tag: None})).with_empty_span()
                ),
                link_symbol: None,
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );

    let sql4 = r#"CREATE OR REPLACE FUNCTION check_values_different(int1 INT, int2 INT) RETURNS BOOLEAN LANGUAGE plpgsql AS $$
BEGIN
    IF int1 <> int2 THEN
        RETURN TRUE;
    ELSE
        RETURN FALSE;
    END IF;
END;
$$"#;
    assert_eq!(
        pg_and_generic().verified_stmt(sql4),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: true,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("check_values_different")]),
            args: Some(vec![
                OperateFunctionArg::with_name(
                    "int1",
                    DataType::Int(None)
                ),
                OperateFunctionArg::with_name(
                    "int2",
                    DataType::Int(None)
                ),
            ]),
            return_type: Some(DataType::Boolean),
            language: Some("plpgsql".into()),
            behavior: None,
            called_on_null: None,
            parallel: None,
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::DollarQuotedString(DollarQuotedString {value: "\nBEGIN\n    IF int1 <> int2 THEN\n        RETURN TRUE;\n    ELSE\n        RETURN FALSE;\n    END IF;\nEND;\n".to_owned(), tag: None})).with_empty_span()
                ),
                link_symbol: None,
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );

    let sql5 = r#"CREATE OR REPLACE FUNCTION foo(a TIMESTAMP WITH TIME ZONE, b VARCHAR) RETURNS BOOLEAN LANGUAGE plpgsql AS $$
    BEGIN
        RETURN TRUE;
    END;
    $$"#;
    assert_eq!(
        pg_and_generic().verified_stmt(sql5),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: true,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("foo")]),
            args: Some(vec![
                OperateFunctionArg::with_name(
                    "a",
                    DataType::Timestamp(None, TimezoneInfo::WithTimeZone)
                ),
                OperateFunctionArg::with_name("b", DataType::Varchar(None)),
            ]),
            return_type: Some(DataType::Boolean),
            language: Some("plpgsql".into()),
            behavior: None,
            called_on_null: None,
            parallel: None,
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::DollarQuotedString(DollarQuotedString {
                        value: "\n    BEGIN\n        RETURN TRUE;\n    END;\n    ".to_owned(),
                        tag: None
                    }))
                    .with_empty_span()
                ),
                link_symbol: None,
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );

    let incorrect_sql = "CREATE FUNCTION add(function(struct<a,b> int64), b INTEGER) RETURNS INTEGER LANGUAGE SQL IMMUTABLE STRICT PARALLEL SAFE AS 'select $1 + $2;'";
    assert!(pg().parse_sql_statements(incorrect_sql).is_err(),);
}

#[test]
fn parse_create_function() {
    let sql = "CREATE FUNCTION add(INTEGER, INTEGER) RETURNS INTEGER LANGUAGE SQL IMMUTABLE STRICT PARALLEL SAFE AS 'select $1 + $2;'";
    assert_eq!(
        pg_and_generic().verified_stmt(sql),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: false,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("add")]),
            args: Some(vec![
                OperateFunctionArg::unnamed(DataType::Integer(None)),
                OperateFunctionArg::unnamed(DataType::Integer(None)),
            ]),
            return_type: Some(DataType::Integer(None)),
            language: Some("SQL".into()),
            behavior: Some(FunctionBehavior::Immutable),
            called_on_null: Some(FunctionCalledOnNull::Strict),
            parallel: Some(FunctionParallel::Safe),
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::SingleQuotedString("select $1 + $2;".into())).with_empty_span()
                ),
                link_symbol: None,
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );
}

#[test]
fn parse_create_function_detailed() {
    pg_and_generic().verified_stmt("CREATE OR REPLACE FUNCTION add(a INTEGER, IN b INTEGER = 1) RETURNS INTEGER LANGUAGE SQL IMMUTABLE PARALLEL RESTRICTED RETURN a + b");
    pg_and_generic().verified_stmt("CREATE OR REPLACE FUNCTION add(a INTEGER, IN b INTEGER = 1) RETURNS INTEGER LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL RESTRICTED RETURN a + b");
    pg_and_generic().verified_stmt("CREATE OR REPLACE FUNCTION add(a INTEGER, IN b INTEGER = 1) RETURNS INTEGER LANGUAGE SQL STABLE PARALLEL UNSAFE RETURN a + b");
    pg_and_generic().verified_stmt("CREATE OR REPLACE FUNCTION add(a INTEGER, IN b INTEGER = 1) RETURNS INTEGER LANGUAGE SQL STABLE CALLED ON NULL INPUT PARALLEL UNSAFE RETURN a + b");
    pg_and_generic().verified_stmt(r#"CREATE OR REPLACE FUNCTION increment(i INTEGER) RETURNS INTEGER LANGUAGE plpgsql AS $$ BEGIN RETURN i + 1; END; $$"#);
    pg_and_generic().verified_stmt(r#"CREATE OR REPLACE FUNCTION no_arg() RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN DELETE FROM my_table; END; $$"#);
    pg_and_generic().verified_stmt(r#"CREATE OR REPLACE FUNCTION return_table(i INTEGER) RETURNS TABLE(id UUID, is_active BOOLEAN) LANGUAGE plpgsql AS $$ BEGIN RETURN QUERY SELECT NULL::UUID, NULL::BOOLEAN; END; $$"#);
    pg_and_generic().one_statement_parses_to(
        "CREATE FUNCTION add(INTEGER, INTEGER DEFAULT 1) RETURNS INTEGER AS 'select $1 + $2;'",
        "CREATE FUNCTION add(INTEGER, INTEGER = 1) RETURNS INTEGER AS 'select $1 + $2;'",
    );
}

#[test]
fn parse_incorrect_create_function_parallel() {
    let sql = "CREATE FUNCTION add(INTEGER, INTEGER) RETURNS INTEGER LANGUAGE SQL PARALLEL BLAH AS 'select $1 + $2;'";
    assert!(pg().parse_sql_statements(sql).is_err());
}

#[test]
fn parse_create_function_c_with_module_pathname() {
    let sql = "CREATE FUNCTION cas_in(input cstring) RETURNS cas LANGUAGE c IMMUTABLE PARALLEL SAFE AS 'MODULE_PATHNAME', 'cas_in_wrapper'";
    assert_eq!(
        pg_and_generic().verified_stmt(sql),
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: false,
            temporary: false,
            name: ObjectName::from(vec![Ident::new("cas_in")]),
            args: Some(vec![OperateFunctionArg::with_name(
                "input",
                DataType::Custom(ObjectName::from(vec![Ident::new("cstring")]), vec![]),
            ),]),
            return_type: Some(DataType::Custom(
                ObjectName::from(vec![Ident::new("cas")]),
                vec![]
            )),
            language: Some("c".into()),
            behavior: Some(FunctionBehavior::Immutable),
            called_on_null: None,
            parallel: Some(FunctionParallel::Safe),
            function_body: Some(CreateFunctionBody::AsBeforeOptions {
                body: Expr::Value(
                    (Value::SingleQuotedString("MODULE_PATHNAME".into())).with_empty_span()
                ),
                link_symbol: Some(Expr::Value(
                    (Value::SingleQuotedString("cas_in_wrapper".into())).with_empty_span()
                )),
            }),
            if_not_exists: false,
            using: None,
            determinism_specifier: None,
            options: None,
            remote_connection: None,
        })
    );

    // Test that attribute order flexibility works (IMMUTABLE before LANGUAGE)
    let sql_alt_order = "CREATE FUNCTION cas_in(input cstring) RETURNS cas IMMUTABLE PARALLEL SAFE LANGUAGE c AS 'MODULE_PATHNAME', 'cas_in_wrapper'";
    pg_and_generic().one_statement_parses_to(
        sql_alt_order,
        "CREATE FUNCTION cas_in(input cstring) RETURNS cas LANGUAGE c IMMUTABLE PARALLEL SAFE AS 'MODULE_PATHNAME', 'cas_in_wrapper'"
    );
}

#[test]
fn parse_drop_function() {
    let sql = "DROP FUNCTION IF EXISTS test_func";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropFunction(DropFunction {
            if_exists: true,
            func_desc: vec![FunctionDesc {
                name: ObjectName::from(vec![Ident {
                    value: "test_func".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                args: None
            }],
            drop_behavior: None
        })
    );

    let sql = "DROP FUNCTION IF EXISTS test_func(a INTEGER, IN b INTEGER = 1)";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropFunction(DropFunction {
            if_exists: true,
            func_desc: vec![FunctionDesc {
                name: ObjectName::from(vec![Ident {
                    value: "test_func".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                args: Some(vec![
                    OperateFunctionArg::with_name("a", DataType::Integer(None)),
                    OperateFunctionArg {
                        mode: Some(ArgMode::In),
                        name: Some("b".into()),
                        data_type: DataType::Integer(None),
                        default_expr: Some(Expr::Value(
                            (Value::Number("1".parse().unwrap(), false)).with_empty_span()
                        )),
                    }
                ]),
            }],
            drop_behavior: None
        })
    );

    let sql = "DROP FUNCTION IF EXISTS test_func1(a INTEGER, IN b INTEGER = 1), test_func2(a VARCHAR, IN b INTEGER = 1)";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropFunction(DropFunction {
            if_exists: true,
            func_desc: vec![
                FunctionDesc {
                    name: ObjectName::from(vec![Ident {
                        value: "test_func1".to_string(),
                        quote_style: None,
                        span: Span::empty(),
                    }]),
                    args: Some(vec![
                        OperateFunctionArg::with_name("a", DataType::Integer(None)),
                        OperateFunctionArg {
                            mode: Some(ArgMode::In),
                            name: Some("b".into()),
                            data_type: DataType::Integer(None),
                            default_expr: Some(Expr::Value(
                                (Value::Number("1".parse().unwrap(), false)).with_empty_span()
                            )),
                        }
                    ]),
                },
                FunctionDesc {
                    name: ObjectName::from(vec![Ident {
                        value: "test_func2".to_string(),
                        quote_style: None,
                        span: Span::empty(),
                    }]),
                    args: Some(vec![
                        OperateFunctionArg::with_name("a", DataType::Varchar(None)),
                        OperateFunctionArg {
                            mode: Some(ArgMode::In),
                            name: Some("b".into()),
                            data_type: DataType::Integer(None),
                            default_expr: Some(Expr::Value(
                                (Value::Number("1".parse().unwrap(), false)).with_empty_span()
                            )),
                        }
                    ]),
                }
            ],
            drop_behavior: None
        })
    );
}

#[test]
fn parse_drop_domain() {
    let sql = "DROP DOMAIN IF EXISTS jpeg_domain";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropDomain(DropDomain {
            if_exists: true,
            name: ObjectName::from(vec![Ident {
                value: "jpeg_domain".to_string(),
                quote_style: None,
                span: Span::empty(),
            }]),
            drop_behavior: None
        })
    );

    let sql = "DROP DOMAIN jpeg_domain";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropDomain(DropDomain {
            if_exists: false,
            name: ObjectName::from(vec![Ident {
                value: "jpeg_domain".to_string(),
                quote_style: None,
                span: Span::empty(),
            }]),
            drop_behavior: None
        })
    );

    let sql = "DROP DOMAIN IF EXISTS jpeg_domain CASCADE";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropDomain(DropDomain {
            if_exists: true,
            name: ObjectName::from(vec![Ident {
                value: "jpeg_domain".to_string(),
                quote_style: None,
                span: Span::empty(),
            }]),
            drop_behavior: Some(DropBehavior::Cascade)
        })
    );

    let sql = "DROP DOMAIN IF EXISTS jpeg_domain RESTRICT";

    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropDomain(DropDomain {
            if_exists: true,
            name: ObjectName::from(vec![Ident {
                value: "jpeg_domain".to_string(),
                quote_style: None,
                span: Span::empty(),
            }]),
            drop_behavior: Some(DropBehavior::Restrict)
        })
    );
}

#[test]
fn parse_drop_procedure() {
    let sql = "DROP PROCEDURE IF EXISTS test_proc";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropProcedure {
            if_exists: true,
            proc_desc: vec![FunctionDesc {
                name: ObjectName::from(vec![Ident {
                    value: "test_proc".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                args: None
            }],
            drop_behavior: None
        }
    );

    let sql = "DROP PROCEDURE IF EXISTS test_proc(a INTEGER, IN b INTEGER = 1)";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropProcedure {
            if_exists: true,
            proc_desc: vec![FunctionDesc {
                name: ObjectName::from(vec![Ident {
                    value: "test_proc".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }]),
                args: Some(vec![
                    OperateFunctionArg::with_name("a", DataType::Integer(None)),
                    OperateFunctionArg {
                        mode: Some(ArgMode::In),
                        name: Some("b".into()),
                        data_type: DataType::Integer(None),
                        default_expr: Some(Expr::Value(
                            (Value::Number("1".parse().unwrap(), false)).with_empty_span()
                        )),
                    }
                ]),
            }],
            drop_behavior: None
        }
    );

    let sql = "DROP PROCEDURE IF EXISTS test_proc1(a INTEGER, IN b INTEGER = 1), test_proc2(a VARCHAR, IN b INTEGER = 1)";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::DropProcedure {
            if_exists: true,
            proc_desc: vec![
                FunctionDesc {
                    name: ObjectName::from(vec![Ident {
                        value: "test_proc1".to_string(),
                        quote_style: None,
                        span: Span::empty(),
                    }]),
                    args: Some(vec![
                        OperateFunctionArg::with_name("a", DataType::Integer(None)),
                        OperateFunctionArg {
                            mode: Some(ArgMode::In),
                            name: Some("b".into()),
                            data_type: DataType::Integer(None),
                            default_expr: Some(Expr::Value(
                                (Value::Number("1".parse().unwrap(), false)).with_empty_span()
                            )),
                        }
                    ]),
                },
                FunctionDesc {
                    name: ObjectName::from(vec![Ident {
                        value: "test_proc2".to_string(),
                        quote_style: None,
                        span: Span::empty(),
                    }]),
                    args: Some(vec![
                        OperateFunctionArg::with_name("a", DataType::Varchar(None)),
                        OperateFunctionArg {
                            mode: Some(ArgMode::In),
                            name: Some("b".into()),
                            data_type: DataType::Integer(None),
                            default_expr: Some(Expr::Value(
                                (Value::Number("1".parse().unwrap(), false)).with_empty_span()
                            )),
                        }
                    ]),
                }
            ],
            drop_behavior: None
        }
    );

    let res = pg().parse_sql_statements("DROP PROCEDURE testproc DROP");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: DROP".to_string()),
        res.unwrap_err()
    );

    let res = pg().parse_sql_statements("DROP PROCEDURE testproc SET NULL");
    assert_eq!(
        ParserError::ParserError("Expected: end of statement, found: SET".to_string()),
        res.unwrap_err()
    );
}

#[test]
fn parse_dollar_quoted_string() {
    let sql = "SELECT $$hello$$, $tag_name$world$tag_name$, $$Foo$Bar$$, $$Foo$Bar$$col_name, $$$$, $tag_name$$tag_name$";

    let stmt = pg().parse_sql_statements(sql).unwrap();

    let projection = match stmt.first().unwrap() {
        Statement::Query(query) => match &*query.body {
            SetExpr::Select(select) => &select.projection,
            _ => unreachable!(),
        },
        _ => unreachable!(),
    };

    assert_eq!(
        &Expr::Value(
            (Value::DollarQuotedString(DollarQuotedString {
                tag: None,
                value: "hello".into()
            }))
            .with_empty_span()
        ),
        expr_from_projection(&projection[0])
    );

    assert_eq!(
        &Expr::Value(
            (Value::DollarQuotedString(DollarQuotedString {
                tag: Some("tag_name".into()),
                value: "world".into()
            }))
            .with_empty_span()
        ),
        expr_from_projection(&projection[1])
    );

    assert_eq!(
        &Expr::Value(
            (Value::DollarQuotedString(DollarQuotedString {
                tag: None,
                value: "Foo$Bar".into()
            }))
            .with_empty_span()
        ),
        expr_from_projection(&projection[2])
    );

    assert_eq!(
        projection[3],
        SelectItem::ExprWithAlias {
            expr: Expr::Value(
                (Value::DollarQuotedString(DollarQuotedString {
                    tag: None,
                    value: "Foo$Bar".into(),
                }))
                .with_empty_span()
            ),
            alias: Ident {
                value: "col_name".into(),
                quote_style: None,
                span: Span::empty(),
            },
        },
    );

    assert_eq!(
        expr_from_projection(&projection[4]),
        &Expr::Value(
            (Value::DollarQuotedString(DollarQuotedString {
                tag: None,
                value: "".into()
            }))
            .with_empty_span()
        ),
    );

    assert_eq!(
        expr_from_projection(&projection[5]),
        &Expr::Value(
            (Value::DollarQuotedString(DollarQuotedString {
                tag: Some("tag_name".into()),
                value: "".into()
            }))
            .with_empty_span()
        ),
    );
}

#[test]
fn parse_incorrect_dollar_quoted_string() {
    let sql = "SELECT $x$hello$$";
    assert!(pg().parse_sql_statements(sql).is_err());

    let sql = "SELECT $hello$$";
    assert!(pg().parse_sql_statements(sql).is_err());

    let sql = "SELECT $$$";
    assert!(pg().parse_sql_statements(sql).is_err());
}

#[test]
fn parse_select_group_by_grouping_sets() {
    let select = pg_and_generic().verified_only_select(
        "SELECT brand, size, sum(sales) FROM items_sold GROUP BY size, GROUPING SETS ((brand), (size), ())"
    );
    assert_eq!(
        GroupByExpr::Expressions(
            vec![
                Expr::Identifier(Ident::new("size")),
                Expr::GroupingSets(vec![
                    vec![Expr::Identifier(Ident::new("brand"))],
                    vec![Expr::Identifier(Ident::new("size"))],
                    vec![],
                ]),
            ],
            vec![]
        ),
        select.group_by
    );
}

#[test]
fn parse_select_group_by_rollup() {
    let select = pg_and_generic().verified_only_select(
        "SELECT brand, size, sum(sales) FROM items_sold GROUP BY size, ROLLUP (brand, size)",
    );
    assert_eq!(
        GroupByExpr::Expressions(
            vec![
                Expr::Identifier(Ident::new("size")),
                Expr::Rollup(vec![
                    vec![Expr::Identifier(Ident::new("brand"))],
                    vec![Expr::Identifier(Ident::new("size"))],
                ]),
            ],
            vec![]
        ),
        select.group_by
    );
}

#[test]
fn parse_select_group_by_cube() {
    let select = pg_and_generic().verified_only_select(
        "SELECT brand, size, sum(sales) FROM items_sold GROUP BY size, CUBE (brand, size)",
    );
    assert_eq!(
        GroupByExpr::Expressions(
            vec![
                Expr::Identifier(Ident::new("size")),
                Expr::Cube(vec![
                    vec![Expr::Identifier(Ident::new("brand"))],
                    vec![Expr::Identifier(Ident::new("size"))],
                ]),
            ],
            vec![]
        ),
        select.group_by
    );
}

#[test]
fn parse_truncate() {
    let truncate = pg_and_generic().verified_stmt("TRUNCATE db.table_name");
    let table_name = ObjectName::from(vec![Ident::new("db"), Ident::new("table_name")]);
    let table_names = vec![TruncateTableTarget {
        name: table_name.clone(),
        only: false,
    }];
    assert_eq!(
        Statement::Truncate(Truncate {
            table_names,
            partitions: None,
            table: false,
            identity: None,
            cascade: None,
            on_cluster: None,
        }),
        truncate
    );
}

#[test]
fn parse_truncate_with_options() {
    let truncate = pg_and_generic()
        .verified_stmt("TRUNCATE TABLE ONLY db.table_name RESTART IDENTITY CASCADE");

    let table_name = ObjectName::from(vec![Ident::new("db"), Ident::new("table_name")]);
    let table_names = vec![TruncateTableTarget {
        name: table_name.clone(),
        only: true,
    }];

    assert_eq!(
        Statement::Truncate(Truncate {
            table_names,
            partitions: None,
            table: true,
            identity: Some(TruncateIdentityOption::Restart),
            cascade: Some(CascadeOption::Cascade),
            on_cluster: None,
        }),
        truncate
    );
}

#[test]
fn parse_truncate_with_table_list() {
    let truncate = pg().verified_stmt(
        "TRUNCATE TABLE db.table_name, db.other_table_name RESTART IDENTITY CASCADE",
    );

    let table_name_a = ObjectName::from(vec![Ident::new("db"), Ident::new("table_name")]);
    let table_name_b = ObjectName::from(vec![Ident::new("db"), Ident::new("other_table_name")]);

    let table_names = vec![
        TruncateTableTarget {
            name: table_name_a.clone(),
            only: false,
        },
        TruncateTableTarget {
            name: table_name_b.clone(),
            only: false,
        },
    ];

    assert_eq!(
        Statement::Truncate(Truncate {
            table_names,
            partitions: None,
            table: true,
            identity: Some(TruncateIdentityOption::Restart),
            cascade: Some(CascadeOption::Cascade),
            on_cluster: None,
        }),
        truncate
    );
}

#[test]
fn parse_select_regexp_as_column_name() {
    pg_and_generic().verified_only_select(
        "SELECT REGEXP.REGEXP AS REGEXP FROM REGEXP AS REGEXP WHERE REGEXP.REGEXP",
    );
}

#[test]
fn parse_create_table_with_alias() {
    let sql = "CREATE TABLE public.datatype_aliases
    (
      int8_col INT8,
      int4_col INT4,
      int2_col INT2,
      float8_col FLOAT8,
      float4_col FLOAT4,
      bool_col BOOL
    );";
    match pg_and_generic().one_statement_parses_to(sql, "") {
        Statement::CreateTable(CreateTable {
            name,
            columns,
            constraints,
            if_not_exists: false,
            external: false,
            file_format: None,
            location: None,
            ..
        }) => {
            assert_eq!("public.datatype_aliases", name.to_string());
            assert_eq!(
                columns,
                vec![
                    ColumnDef {
                        name: "int8_col".into(),
                        data_type: DataType::Int8(None),
                        options: vec![]
                    },
                    ColumnDef {
                        name: "int4_col".into(),
                        data_type: DataType::Int4(None),
                        options: vec![]
                    },
                    ColumnDef {
                        name: "int2_col".into(),
                        data_type: DataType::Int2(None),
                        options: vec![]
                    },
                    ColumnDef {
                        name: "float8_col".into(),
                        data_type: DataType::Float8,
                        options: vec![]
                    },
                    ColumnDef {
                        name: "float4_col".into(),
                        data_type: DataType::Float4,
                        options: vec![]
                    },
                    ColumnDef {
                        name: "bool_col".into(),
                        data_type: DataType::Bool,
                        options: vec![]
                    },
                ]
            );
            assert!(constraints.is_empty());
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_table_with_partition_by() {
    let sql = "CREATE TABLE t1 (a INT, b TEXT) PARTITION BY RANGE(a)";
    match pg_and_generic().verified_stmt(sql) {
        Statement::CreateTable(create_table) => {
            assert_eq!("t1", create_table.name.to_string());
            assert_eq!(
                vec![
                    ColumnDef {
                        name: "a".into(),
                        data_type: DataType::Int(None),
                        options: vec![]
                    },
                    ColumnDef {
                        name: "b".into(),
                        data_type: DataType::Text,
                        options: vec![]
                    }
                ],
                create_table.columns
            );
            match *create_table.partition_by.unwrap() {
                Expr::Function(f) => {
                    assert_eq!("RANGE", f.name.to_string());
                    assert_eq!(
                        FunctionArguments::List(FunctionArgumentList {
                            duplicate_treatment: None,
                            clauses: vec![],
                            args: vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
                                Expr::Identifier(Ident::new("a"))
                            ))],
                        }),
                        f.args
                    );
                }
                _ => unreachable!(),
            }
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_join_constraint_unnest_alias() {
    assert_eq!(
        only(
            pg().verified_only_select("SELECT * FROM t1 JOIN UNNEST(t1.a) AS f ON c1 = c2")
                .from
        )
        .joins,
        vec![Join {
            relation: TableFactor::UNNEST {
                alias: table_alias(true, "f"),
                array_exprs: vec![Expr::CompoundIdentifier(vec![
                    Ident::new("t1"),
                    Ident::new("a")
                ])],
                with_offset: false,
                with_offset_alias: None,
                with_ordinality: false,
            },
            global: false,
            join_operator: JoinOperator::Join(JoinConstraint::On(Expr::BinaryOp {
                left: Box::new(Expr::Identifier("c1".into())),
                op: BinaryOperator::Eq,
                right: Box::new(Expr::Identifier("c2".into())),
            })),
        }]
    );
}

#[test]
fn test_complex_postgres_insert_with_alias() {
    let sql1 = "WITH existing AS (SELECT test_table.id FROM test_tables AS test_table WHERE (a = 12) AND (b = 34)), inserted AS (INSERT INTO test_tables AS test_table (id, a, b, c) VALUES (DEFAULT, 56, 78, 90) ON CONFLICT(a, b) DO UPDATE SET c = EXCLUDED.c WHERE (test_table.c <> EXCLUDED.c)) SELECT c FROM existing";

    pg().verified_stmt(sql1);
}

#[cfg(not(feature = "bigdecimal"))]
#[test]
fn test_simple_postgres_insert_with_alias() {
    let sql2 = "INSERT INTO test_tables AS test_table (id, a) VALUES (DEFAULT, 123)";

    let statement = pg().verified_stmt(sql2);

    assert_eq!(
        statement,
        Statement::Insert(Insert {
            insert_token: AttachedToken::empty(),
            or: None,
            ignore: false,
            into: true,
            table: TableObject::TableName(ObjectName::from(vec![Ident {
                value: "test_tables".to_string(),
                quote_style: None,
                span: Span::empty(),
            }])),
            table_alias: Some(Ident {
                value: "test_table".to_string(),
                quote_style: None,
                span: Span::empty(),
            }),
            columns: vec![
                Ident {
                    value: "id".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                },
                Ident {
                    value: "a".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }
            ],
            overwrite: false,
            source: Some(Box::new(Query {
                with: None,
                body: Box::new(SetExpr::Values(Values {
                    value_keyword: false,
                    explicit_row: false,
                    rows: vec![vec![
                        Expr::Identifier(Ident::new("DEFAULT")),
                        Expr::Value((Value::Number("123".to_string(), false)).with_empty_span())
                    ]]
                })),
                order_by: None,
                limit_clause: None,
                fetch: None,
                locks: vec![],
                for_clause: None,
                settings: None,
                format_clause: None,
                pipe_operators: vec![],
            })),
            assignments: vec![],
            partitioned: None,
            after_columns: vec![],
            has_table_keyword: false,
            on: None,
            returning: None,
            replace_into: false,
            priority: None,
            insert_alias: None,
            settings: None,
            format_clause: None,
        })
    )
}

#[cfg(feature = "bigdecimal")]
#[test]
fn test_simple_postgres_insert_with_alias() {
    let sql2 = "INSERT INTO test_tables AS test_table (id, a) VALUES (DEFAULT, 123)";

    let statement = pg().verified_stmt(sql2);

    assert_eq!(
        statement,
        Statement::Insert(Insert {
            insert_token: AttachedToken::empty(),
            or: None,
            ignore: false,
            into: true,
            table: TableObject::TableName(ObjectName::from(vec![Ident {
                value: "test_tables".to_string(),
                quote_style: None,
                span: Span::empty(),
            }])),
            table_alias: Some(Ident {
                value: "test_table".to_string(),
                quote_style: None,
                span: Span::empty(),
            }),
            columns: vec![
                Ident {
                    value: "id".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                },
                Ident {
                    value: "a".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }
            ],
            overwrite: false,
            source: Some(Box::new(Query {
                with: None,
                body: Box::new(SetExpr::Values(Values {
                    value_keyword: false,
                    explicit_row: false,
                    rows: vec![vec![
                        Expr::Identifier(Ident::new("DEFAULT")),
                        Expr::Value(
                            (Value::Number(bigdecimal::BigDecimal::new(123.into(), 0), false))
                                .with_empty_span()
                        )
                    ]]
                })),
                order_by: None,
                limit_clause: None,
                fetch: None,
                locks: vec![],
                for_clause: None,
                settings: None,
                format_clause: None,
                pipe_operators: vec![],
            })),
            assignments: vec![],
            partitioned: None,
            after_columns: vec![],
            has_table_keyword: false,
            on: None,
            returning: None,
            replace_into: false,
            priority: None,
            insert_alias: None,
            settings: None,
            format_clause: None,
        })
    )
}

#[test]
fn test_simple_insert_with_quoted_alias() {
    let sql = r#"INSERT INTO test_tables AS "Test_Table" (id, a) VALUES (DEFAULT, '0123')"#;

    let statement = pg().verified_stmt(sql);

    assert_eq!(
        statement,
        Statement::Insert(Insert {
            insert_token: AttachedToken::empty(),
            or: None,
            ignore: false,
            into: true,
            table: TableObject::TableName(ObjectName::from(vec![Ident {
                value: "test_tables".to_string(),
                quote_style: None,
                span: Span::empty(),
            }])),
            table_alias: Some(Ident {
                value: "Test_Table".to_string(),
                quote_style: Some('"'),
                span: Span::empty(),
            }),
            columns: vec![
                Ident {
                    value: "id".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                },
                Ident {
                    value: "a".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                }
            ],
            overwrite: false,
            source: Some(Box::new(Query {
                with: None,
                body: Box::new(SetExpr::Values(Values {
                    value_keyword: false,
                    explicit_row: false,
                    rows: vec![vec![
                        Expr::Identifier(Ident::new("DEFAULT")),
                        Expr::Value(
                            (Value::SingleQuotedString("0123".to_string())).with_empty_span()
                        )
                    ]]
                })),
                order_by: None,
                limit_clause: None,
                fetch: None,
                locks: vec![],
                for_clause: None,
                settings: None,
                format_clause: None,
                pipe_operators: vec![],
            })),
            assignments: vec![],
            partitioned: None,
            after_columns: vec![],
            has_table_keyword: false,
            on: None,
            returning: None,
            replace_into: false,
            priority: None,
            insert_alias: None,
            settings: None,
            format_clause: None,
        })
    )
}

#[test]
fn parse_array_agg() {
    // follows general function with wildcard code path
    let sql = r#"SELECT GREATEST(sections_tbl.*) AS sections FROM sections_tbl"#;
    pg().verified_stmt(sql);

    // follows special-case array_agg code path
    let sql2 = "SELECT ARRAY_AGG(sections_tbl.*) AS sections FROM sections_tbl";
    pg().verified_stmt(sql2);

    // handles multi-part identifier with general code path
    let sql3 = "SELECT GREATEST(my_schema.sections_tbl.*) AS sections FROM sections_tbl";
    pg().verified_stmt(sql3);

    // handles multi-part identifier with array_agg code path
    let sql4 = "SELECT ARRAY_AGG(my_schema.sections_tbl.*) AS sections FROM sections_tbl";
    pg().verified_stmt(sql4);
}

#[test]
fn parse_mat_cte() {
    let sql = r#"WITH cte AS MATERIALIZED (SELECT id FROM accounts) SELECT id FROM cte"#;
    pg().verified_stmt(sql);

    let sql2 = r#"WITH cte AS NOT MATERIALIZED (SELECT id FROM accounts) SELECT id FROM cte"#;
    pg().verified_stmt(sql2);
}

#[test]
fn parse_at_time_zone() {
    pg_and_generic().verified_expr("CURRENT_TIMESTAMP AT TIME ZONE tz");
    pg_and_generic().verified_expr("CURRENT_TIMESTAMP AT TIME ZONE ('America/' || 'Los_Angeles')");

    // check precedence
    let expr = Expr::BinaryOp {
        left: Box::new(Expr::AtTimeZone {
            timestamp: Box::new(Expr::TypedString(TypedString {
                data_type: DataType::Timestamp(None, TimezoneInfo::None),
                value: ValueWithSpan {
                    value: Value::SingleQuotedString("2001-09-28 01:00".to_string()),
                    span: Span::empty(),
                },
                uses_odbc_syntax: false,
            })),
            time_zone: Box::new(Expr::Cast {
                kind: CastKind::DoubleColon,
                expr: Box::new(Expr::Value(
                    Value::SingleQuotedString("America/Los_Angeles".to_owned()).with_empty_span(),
                )),
                data_type: DataType::Text,
                format: None,
            }),
        }),
        op: BinaryOperator::Plus,
        right: Box::new(Expr::Interval(Interval {
            value: Box::new(Expr::Value(
                Value::SingleQuotedString("23 hours".to_owned()).with_empty_span(),
            )),
            leading_field: None,
            leading_precision: None,
            last_field: None,
            fractional_seconds_precision: None,
        })),
    };
    pretty_assertions::assert_eq!(
        pg_and_generic().verified_expr(
            "TIMESTAMP '2001-09-28 01:00' AT TIME ZONE 'America/Los_Angeles'::TEXT + INTERVAL '23 hours'",
        ),
        expr
    );
}

#[test]
fn parse_interval_data_type() {
    pg_and_generic().verified_stmt("CREATE TABLE t (i INTERVAL)");
    for p in 0..=6 {
        pg_and_generic().verified_stmt(&format!("CREATE TABLE t (i INTERVAL({p}))"));
        pg_and_generic().verified_stmt(&format!("SELECT '1 second'::INTERVAL({p})"));
        pg_and_generic().verified_stmt(&format!("SELECT CAST('1 second' AS INTERVAL({p}))"));
    }
    let fields = [
        "YEAR",
        "MONTH",
        "DAY",
        "HOUR",
        "MINUTE",
        "SECOND",
        "YEAR TO MONTH",
        "DAY TO HOUR",
        "DAY TO MINUTE",
        "DAY TO SECOND",
        "HOUR TO MINUTE",
        "HOUR TO SECOND",
        "MINUTE TO SECOND",
    ];
    for field in fields {
        pg_and_generic().verified_stmt(&format!("CREATE TABLE t (i INTERVAL {field})"));
        pg_and_generic().verified_stmt(&format!("SELECT '1 second'::INTERVAL {field}"));
        pg_and_generic().verified_stmt(&format!("SELECT CAST('1 second' AS INTERVAL {field})"));
    }
    for p in 0..=6 {
        for field in fields {
            pg_and_generic().verified_stmt(&format!("CREATE TABLE t (i INTERVAL {field}({p}))"));
            pg_and_generic().verified_stmt(&format!("SELECT '1 second'::INTERVAL {field}({p})"));
            pg_and_generic()
                .verified_stmt(&format!("SELECT CAST('1 second' AS INTERVAL {field}({p}))"));
        }
    }
}

#[test]
fn parse_create_table_with_options() {
    let sql = "CREATE TABLE t (c INT) WITH (foo = 'bar', a = 123)";
    match pg().verified_stmt(sql) {
        Statement::CreateTable(CreateTable { table_options, .. }) => {
            let with_options = match table_options {
                CreateTableOptions::With(options) => options,
                _ => unreachable!(),
            };
            assert_eq!(
                vec![
                    SqlOption::KeyValue {
                        key: "foo".into(),
                        value: Expr::Value(
                            (Value::SingleQuotedString("bar".into())).with_empty_span()
                        ),
                    },
                    SqlOption::KeyValue {
                        key: "a".into(),
                        value: Expr::value(number("123")),
                    },
                ],
                with_options
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn test_table_function_with_ordinality() {
    let from = pg_and_generic()
        .verified_only_select("SELECT * FROM generate_series(1, 10) WITH ORDINALITY AS t")
        .from;
    assert_eq!(1, from.len());
    match from[0].relation {
        TableFactor::Table {
            ref name,
            with_ordinality: true,
            ..
        } => {
            assert_eq!("generate_series", name.to_string().as_str());
        }
        _ => panic!("Expecting TableFactor::Table with ordinality"),
    }
}

#[test]
fn test_table_unnest_with_ordinality() {
    let from = pg_and_generic()
        .verified_only_select("SELECT * FROM UNNEST([10, 20, 30]) WITH ORDINALITY AS t")
        .from;
    assert_eq!(1, from.len());
    match from[0].relation {
        TableFactor::UNNEST {
            with_ordinality: true,
            ..
        } => {}
        _ => panic!("Expecting TableFactor::UNNEST with ordinality"),
    }
}

#[test]
fn test_escaped_string_literal() {
    match pg().verified_expr(r#"E'\n'"#) {
        Expr::Value(ValueWithSpan {
            value: Value::EscapedStringLiteral(s),
            span: _,
        }) => {
            assert_eq!("\n", s);
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_domain() {
    let sql1 = "CREATE DOMAIN my_domain AS INTEGER CHECK (VALUE > 0)";
    let expected = Statement::CreateDomain(CreateDomain {
        name: ObjectName::from(vec![Ident::new("my_domain")]),
        data_type: DataType::Integer(None),
        collation: None,
        default: None,
        constraints: vec![CheckConstraint {
            name: None,
            expr: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident::new("VALUE"))),
                op: BinaryOperator::Gt,
                right: Box::new(Expr::Value(test_utils::number("0").into())),
            }),
            enforced: None,
        }
        .into()],
    });

    assert_eq!(pg().verified_stmt(sql1), expected);

    let sql2 = "CREATE DOMAIN my_domain AS INTEGER COLLATE \"en_US\" CHECK (VALUE > 0)";
    let expected = Statement::CreateDomain(CreateDomain {
        name: ObjectName::from(vec![Ident::new("my_domain")]),
        data_type: DataType::Integer(None),
        collation: Some(Ident::with_quote('"', "en_US")),
        default: None,
        constraints: vec![CheckConstraint {
            name: None,
            expr: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident::new("VALUE"))),
                op: BinaryOperator::Gt,
                right: Box::new(Expr::Value(test_utils::number("0").into())),
            }),
            enforced: None,
        }
        .into()],
    });

    assert_eq!(pg().verified_stmt(sql2), expected);

    let sql3 = "CREATE DOMAIN my_domain AS INTEGER DEFAULT 1 CHECK (VALUE > 0)";
    let expected = Statement::CreateDomain(CreateDomain {
        name: ObjectName::from(vec![Ident::new("my_domain")]),
        data_type: DataType::Integer(None),
        collation: None,
        default: Some(Expr::Value(test_utils::number("1").into())),
        constraints: vec![CheckConstraint {
            name: None,
            expr: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident::new("VALUE"))),
                op: BinaryOperator::Gt,
                right: Box::new(Expr::Value(test_utils::number("0").into())),
            }),
            enforced: None,
        }
        .into()],
    });

    assert_eq!(pg().verified_stmt(sql3), expected);

    let sql4 = "CREATE DOMAIN my_domain AS INTEGER COLLATE \"en_US\" DEFAULT 1 CHECK (VALUE > 0)";
    let expected = Statement::CreateDomain(CreateDomain {
        name: ObjectName::from(vec![Ident::new("my_domain")]),
        data_type: DataType::Integer(None),
        collation: Some(Ident::with_quote('"', "en_US")),
        default: Some(Expr::Value(test_utils::number("1").into())),
        constraints: vec![CheckConstraint {
            name: None,
            expr: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident::new("VALUE"))),
                op: BinaryOperator::Gt,
                right: Box::new(Expr::Value(test_utils::number("0").into())),
            }),
            enforced: None,
        }
        .into()],
    });

    assert_eq!(pg().verified_stmt(sql4), expected);

    let sql5 = "CREATE DOMAIN my_domain AS INTEGER CONSTRAINT my_constraint CHECK (VALUE > 0)";
    let expected = Statement::CreateDomain(CreateDomain {
        name: ObjectName::from(vec![Ident::new("my_domain")]),
        data_type: DataType::Integer(None),
        collation: None,
        default: None,
        constraints: vec![CheckConstraint {
            name: Some(Ident::new("my_constraint")),
            expr: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident::new("VALUE"))),
                op: BinaryOperator::Gt,
                right: Box::new(Expr::Value(test_utils::number("0").into())),
            }),
            enforced: None,
        }
        .into()],
    });

    assert_eq!(pg().verified_stmt(sql5), expected);
}

#[test]
fn parse_create_simple_before_insert_trigger() {
    let sql = "CREATE TRIGGER check_insert BEFORE INSERT ON accounts FOR EACH ROW EXECUTE FUNCTION check_account_insert";
    let expected = Statement::CreateTrigger(CreateTrigger {
        or_alter: false,
        temporary: false,
        or_replace: false,
        is_constraint: false,
        name: ObjectName::from(vec![Ident::new("check_insert")]),
        period: Some(TriggerPeriod::Before),
        period_before_table: true,
        events: vec![TriggerEvent::Insert],
        table_name: ObjectName::from(vec![Ident::new("accounts")]),
        referenced_table_name: None,
        referencing: vec![],
        trigger_object: Some(TriggerObjectKind::ForEach(TriggerObject::Row)),
        condition: None,
        exec_body: Some(TriggerExecBody {
            exec_type: TriggerExecBodyType::Function,
            func_desc: FunctionDesc {
                name: ObjectName::from(vec![Ident::new("check_account_insert")]),
                args: None,
            },
        }),
        statements_as: false,
        statements: None,
        characteristics: None,
    });

    assert_eq!(pg().verified_stmt(sql), expected);
}

#[test]
fn parse_create_after_update_trigger_with_condition() {
    let sql = "CREATE TRIGGER check_update AFTER UPDATE ON accounts FOR EACH ROW WHEN (NEW.balance > 10000) EXECUTE FUNCTION check_account_update";
    let expected = Statement::CreateTrigger(CreateTrigger {
        or_alter: false,
        temporary: false,
        or_replace: false,
        is_constraint: false,
        name: ObjectName::from(vec![Ident::new("check_update")]),
        period: Some(TriggerPeriod::After),
        period_before_table: true,
        events: vec![TriggerEvent::Update(vec![])],
        table_name: ObjectName::from(vec![Ident::new("accounts")]),
        referenced_table_name: None,
        referencing: vec![],
        trigger_object: Some(TriggerObjectKind::ForEach(TriggerObject::Row)),
        condition: Some(Expr::Nested(Box::new(Expr::BinaryOp {
            left: Box::new(Expr::CompoundIdentifier(vec![
                Ident::new("NEW"),
                Ident::new("balance"),
            ])),
            op: BinaryOperator::Gt,
            right: Box::new(Expr::value(number("10000"))),
        }))),
        exec_body: Some(TriggerExecBody {
            exec_type: TriggerExecBodyType::Function,
            func_desc: FunctionDesc {
                name: ObjectName::from(vec![Ident::new("check_account_update")]),
                args: None,
            },
        }),
        statements_as: false,
        statements: None,
        characteristics: None,
    });

    assert_eq!(pg().verified_stmt(sql), expected);
}

#[test]
fn parse_create_instead_of_delete_trigger() {
    let sql = "CREATE TRIGGER check_delete INSTEAD OF DELETE ON accounts FOR EACH ROW EXECUTE FUNCTION check_account_deletes";
    let expected = Statement::CreateTrigger(CreateTrigger {
        or_alter: false,
        temporary: false,
        or_replace: false,
        is_constraint: false,
        name: ObjectName::from(vec![Ident::new("check_delete")]),
        period: Some(TriggerPeriod::InsteadOf),
        period_before_table: true,
        events: vec![TriggerEvent::Delete],
        table_name: ObjectName::from(vec![Ident::new("accounts")]),
        referenced_table_name: None,
        referencing: vec![],
        trigger_object: Some(TriggerObjectKind::ForEach(TriggerObject::Row)),
        condition: None,
        exec_body: Some(TriggerExecBody {
            exec_type: TriggerExecBodyType::Function,
            func_desc: FunctionDesc {
                name: ObjectName::from(vec![Ident::new("check_account_deletes")]),
                args: None,
            },
        }),
        statements_as: false,
        statements: None,
        characteristics: None,
    });

    assert_eq!(pg().verified_stmt(sql), expected);
}

#[test]
fn parse_create_trigger_with_multiple_events_and_deferrable() {
    let sql = "CREATE CONSTRAINT TRIGGER check_multiple_events BEFORE INSERT OR UPDATE OR DELETE ON accounts DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE FUNCTION check_account_changes";
    let expected = Statement::CreateTrigger(CreateTrigger {
        or_alter: false,
        temporary: false,
        or_replace: false,
        is_constraint: true,
        name: ObjectName::from(vec![Ident::new("check_multiple_events")]),
        period: Some(TriggerPeriod::Before),
        period_before_table: true,
        events: vec![
            TriggerEvent::Insert,
            TriggerEvent::Update(vec![]),
            TriggerEvent::Delete,
        ],
        table_name: ObjectName::from(vec![Ident::new("accounts")]),
        referenced_table_name: None,
        referencing: vec![],
        trigger_object: Some(TriggerObjectKind::ForEach(TriggerObject::Row)),
        condition: None,
        exec_body: Some(TriggerExecBody {
            exec_type: TriggerExecBodyType::Function,
            func_desc: FunctionDesc {
                name: ObjectName::from(vec![Ident::new("check_account_changes")]),
                args: None,
            },
        }),
        statements_as: false,
        statements: None,
        characteristics: Some(ConstraintCharacteristics {
            deferrable: Some(true),
            initially: Some(DeferrableInitial::Deferred),
            enforced: None,
        }),
    });

    assert_eq!(pg().verified_stmt(sql), expected);
}

#[test]
fn parse_create_trigger_with_referencing() {
    let sql = "CREATE TRIGGER check_referencing BEFORE INSERT ON accounts REFERENCING NEW TABLE AS new_accounts OLD TABLE AS old_accounts FOR EACH ROW EXECUTE FUNCTION check_account_referencing";
    let expected = Statement::CreateTrigger(CreateTrigger {
        or_alter: false,
        temporary: false,
        or_replace: false,
        is_constraint: false,
        name: ObjectName::from(vec![Ident::new("check_referencing")]),
        period: Some(TriggerPeriod::Before),
        period_before_table: true,
        events: vec![TriggerEvent::Insert],
        table_name: ObjectName::from(vec![Ident::new("accounts")]),
        referenced_table_name: None,
        referencing: vec![
            TriggerReferencing {
                refer_type: TriggerReferencingType::NewTable,
                is_as: true,
                transition_relation_name: ObjectName::from(vec![Ident::new("new_accounts")]),
            },
            TriggerReferencing {
                refer_type: TriggerReferencingType::OldTable,
                is_as: true,
                transition_relation_name: ObjectName::from(vec![Ident::new("old_accounts")]),
            },
        ],
        trigger_object: Some(TriggerObjectKind::ForEach(TriggerObject::Row)),
        condition: None,
        exec_body: Some(TriggerExecBody {
            exec_type: TriggerExecBodyType::Function,
            func_desc: FunctionDesc {
                name: ObjectName::from(vec![Ident::new("check_account_referencing")]),
                args: None,
            },
        }),
        statements_as: false,
        statements: None,
        characteristics: None,
    });

    assert_eq!(pg().verified_stmt(sql), expected);
}

#[test]
/// While in the parse_create_trigger test we test the full syntax of the CREATE TRIGGER statement,
/// here we test the invalid cases of the CREATE TRIGGER statement which should cause an appropriate
/// error to be returned.
fn parse_create_trigger_invalid_cases() {
    // Test invalid cases for the CREATE TRIGGER statement
    let invalid_cases = vec![
        (
            "CREATE TRIGGER check_update BEFORE UPDATE ON accounts FUNCTION check_account_update",
            "Expected: an SQL statement, found: FUNCTION"
        ),
        (
            "CREATE TRIGGER check_update TOMORROW UPDATE ON accounts EXECUTE FUNCTION check_account_update",
            "Expected: one of INSERT or UPDATE or DELETE or TRUNCATE, found: TOMORROW"
        ),
        (
            "CREATE TRIGGER check_update BEFORE SAVE ON accounts EXECUTE FUNCTION check_account_update",
            "Expected: one of INSERT or UPDATE or DELETE or TRUNCATE, found: SAVE"
        )
    ];

    for (sql, expected_error) in invalid_cases {
        let res = pg().parse_sql_statements(sql);
        assert_eq!(
            format!("sql parser error: {expected_error}"),
            res.unwrap_err().to_string()
        );
    }
}

#[test]
fn parse_drop_trigger() {
    for if_exists in [true, false] {
        for option in [
            None,
            Some(ReferentialAction::Cascade),
            Some(ReferentialAction::Restrict),
        ] {
            let sql = &format!(
                "DROP TRIGGER{} check_update ON table_name{}",
                if if_exists { " IF EXISTS" } else { "" },
                option
                    .map(|o| format!(" {o}"))
                    .unwrap_or_else(|| "".to_string())
            );
            assert_eq!(
                pg().verified_stmt(sql),
                Statement::DropTrigger(DropTrigger {
                    if_exists,
                    trigger_name: ObjectName::from(vec![Ident::new("check_update")]),
                    table_name: Some(ObjectName::from(vec![Ident::new("table_name")])),
                    option
                })
            );
        }
    }
}

#[test]
fn parse_drop_trigger_invalid_cases() {
    // Test invalid cases for the DROP TRIGGER statement
    let invalid_cases = vec![
        (
            "DROP TRIGGER check_update ON table_name CASCADE RESTRICT",
            "Expected: end of statement, found: RESTRICT",
        ),
        (
            "DROP TRIGGER check_update ON table_name CASCADE CASCADE",
            "Expected: end of statement, found: CASCADE",
        ),
        (
            "DROP TRIGGER check_update ON table_name CASCADE CASCADE CASCADE",
            "Expected: end of statement, found: CASCADE",
        ),
    ];

    for (sql, expected_error) in invalid_cases {
        let res = pg().parse_sql_statements(sql);
        assert_eq!(
            format!("sql parser error: {expected_error}"),
            res.unwrap_err().to_string()
        );
    }
}

#[test]
fn parse_trigger_related_functions() {
    // First we define all parts of the trigger definition,
    // including the table creation, the function creation, the trigger creation and the trigger drop.
    // The following example is taken from the PostgreSQL documentation <https://www.postgresql.org/docs/current/plpgsql-trigger.html>

    let sql_table_creation = r#"
    CREATE TABLE emp (
        empname           text,
        salary            integer,
        last_date         timestamp,
        last_user         text
    );
    "#;

    let sql_create_function = r#"
    CREATE FUNCTION emp_stamp() RETURNS trigger AS $emp_stamp$
        BEGIN
            -- Check that empname and salary are given
            IF NEW.empname IS NULL THEN
                RAISE EXCEPTION 'empname cannot be null';
            END IF;
            IF NEW.salary IS NULL THEN
                RAISE EXCEPTION '% cannot have null salary', NEW.empname;
            END IF;

            -- Who works for us when they must pay for it?
            IF NEW.salary < 0 THEN
                RAISE EXCEPTION '% cannot have a negative salary', NEW.empname;
            END IF;

            -- Remember who changed the payroll when
            NEW.last_date := current_timestamp;
            NEW.last_user := current_user;
            RETURN NEW;
        END;
    $emp_stamp$ LANGUAGE plpgsql;
    "#;

    let sql_create_trigger = r#"
    CREATE TRIGGER emp_stamp BEFORE INSERT OR UPDATE ON emp
        FOR EACH ROW EXECUTE FUNCTION emp_stamp();
    "#;

    let sql_drop_trigger = r#"
    DROP TRIGGER emp_stamp ON emp;
    "#;

    // Now we parse the statements and check if they are parsed correctly.
    let mut statements = pg()
        .parse_sql_statements(&format!(
            "{sql_table_creation}{sql_create_function}{sql_create_trigger}{sql_drop_trigger}"
        ))
        .unwrap();

    assert_eq!(statements.len(), 4);
    let drop_trigger = statements.pop().unwrap();
    let create_trigger = statements.pop().unwrap();
    let create_function = statements.pop().unwrap();
    let create_table = statements.pop().unwrap();

    // Check the first statement
    let create_table = match create_table {
        Statement::CreateTable(create_table) => create_table,
        _ => panic!("Expected CreateTable statement"),
    };

    assert_eq!(
        create_table,
        CreateTable {
            or_replace: false,
            temporary: false,
            external: false,
            global: None,
            dynamic: false,
            if_not_exists: false,
            transient: false,
            volatile: false,
            iceberg: false,
            name: ObjectName::from(vec![Ident::new("emp")]),
            columns: vec![
                ColumnDef {
                    name: "empname".into(),
                    data_type: DataType::Text,
                    options: vec![],
                },
                ColumnDef {
                    name: "salary".into(),
                    data_type: DataType::Integer(None),
                    options: vec![],
                },
                ColumnDef {
                    name: "last_date".into(),
                    data_type: DataType::Timestamp(None, TimezoneInfo::None),
                    options: vec![],
                },
                ColumnDef {
                    name: "last_user".into(),
                    data_type: DataType::Text,
                    options: vec![],
                },
            ],
            constraints: vec![],
            hive_distribution: HiveDistributionStyle::NONE,
            hive_formats: None,
            file_format: None,
            location: None,
            query: None,
            without_rowid: false,
            like: None,
            clone: None,
            comment: None,
            on_commit: None,
            on_cluster: None,
            primary_key: None,
            order_by: None,
            partition_by: None,
            cluster_by: None,
            clustered_by: None,
            inherits: None,
            strict: false,
            copy_grants: false,
            enable_schema_evolution: None,
            change_tracking: None,
            data_retention_time_in_days: None,
            max_data_extension_time_in_days: None,
            default_ddl_collation: None,
            with_aggregation_policy: None,
            with_row_access_policy: None,
            with_tags: None,
            base_location: None,
            external_volume: None,
            catalog: None,
            catalog_sync: None,
            storage_serialization_policy: None,
            table_options: CreateTableOptions::None,
            target_lag: None,
            warehouse: None,
            version: None,
            refresh_mode: None,
            initialize: None,
            require_user: false,
        }
    );

    // Check the second statement

    assert_eq!(
        create_function,
        Statement::CreateFunction(CreateFunction {
            or_alter: false,
            or_replace: false,
            temporary: false,
            if_not_exists: false,
            name: ObjectName::from(vec![Ident::new("emp_stamp")]),
            args: Some(vec![]),
            return_type: Some(DataType::Trigger),
            function_body: Some(
                CreateFunctionBody::AsBeforeOptions {
                    body: Expr::Value((
                        Value::DollarQuotedString(
                            DollarQuotedString {
                                value: "\n        BEGIN\n            -- Check that empname and salary are given\n            IF NEW.empname IS NULL THEN\n                RAISE EXCEPTION 'empname cannot be null';\n            END IF;\n            IF NEW.salary IS NULL THEN\n                RAISE EXCEPTION '% cannot have null salary', NEW.empname;\n            END IF;\n\n            -- Who works for us when they must pay for it?\n            IF NEW.salary < 0 THEN\n                RAISE EXCEPTION '% cannot have a negative salary', NEW.empname;\n            END IF;\n\n            -- Remember who changed the payroll when\n            NEW.last_date := current_timestamp;\n            NEW.last_user := current_user;\n            RETURN NEW;\n        END;\n    ".to_owned(),
                                tag: Some(
                                    "emp_stamp".to_owned(),
                                ),
                            },
                        )
                    ).with_empty_span()),
                    link_symbol: None,
                },
            ),
            behavior: None,
            called_on_null: None,
            parallel: None,
            using: None,
            language: Some(Ident::new("plpgsql")),
            determinism_specifier: None,
            options: None,
            remote_connection: None
        }
    ));

    // Check the third statement

    assert_eq!(
        create_trigger,
        Statement::CreateTrigger(CreateTrigger {
            or_alter: false,
            temporary: false,
            or_replace: false,
            is_constraint: false,
            name: ObjectName::from(vec![Ident::new("emp_stamp")]),
            period: Some(TriggerPeriod::Before),
            period_before_table: true,
            events: vec![TriggerEvent::Insert, TriggerEvent::Update(vec![])],
            table_name: ObjectName::from(vec![Ident::new("emp")]),
            referenced_table_name: None,
            referencing: vec![],
            trigger_object: Some(TriggerObjectKind::ForEach(TriggerObject::Row)),
            condition: None,
            exec_body: Some(TriggerExecBody {
                exec_type: TriggerExecBodyType::Function,
                func_desc: FunctionDesc {
                    name: ObjectName::from(vec![Ident::new("emp_stamp")]),
                    args: Some(vec![]),
                }
            }),
            statements_as: false,
            statements: None,
            characteristics: None
        })
    );

    // Check the fourth statement
    assert_eq!(
        drop_trigger,
        Statement::DropTrigger(DropTrigger {
            if_exists: false,
            trigger_name: ObjectName::from(vec![Ident::new("emp_stamp")]),
            table_name: Some(ObjectName::from(vec![Ident::new("emp")])),
            option: None
        })
    );
}

#[test]
fn test_unicode_string_literal() {
    let pairs = [
        // Example from the postgres docs
        (r#"U&'\0441\043B\043E\043D'"#, "слон"),
        // High unicode code point (> 0xFFFF)
        (r#"U&'\+01F418'"#, "🐘"),
        // Escaped backslash
        (r#"U&'\\'"#, r#"\"#),
        // Escaped single quote
        (r#"U&''''"#, "'"),
    ];
    for (input, expected) in pairs {
        match pg_and_generic().verified_expr(input) {
            Expr::Value(ValueWithSpan {
                value: Value::UnicodeStringLiteral(s),
                span: _,
            }) => {
                assert_eq!(expected, s);
            }
            _ => unreachable!(),
        }
    }
}

fn check_arrow_precedence(sql: &str, arrow_operator: BinaryOperator) {
    assert_eq!(
        pg().verified_expr(sql),
        Expr::BinaryOp {
            left: Box::new(Expr::BinaryOp {
                left: Box::new(Expr::Identifier(Ident {
                    value: "foo".to_string(),
                    quote_style: None,
                    span: Span::empty(),
                })),
                op: arrow_operator,
                right: Box::new(Expr::Value(
                    (Value::SingleQuotedString("bar".to_string())).with_empty_span()
                )),
            }),
            op: BinaryOperator::Eq,
            right: Box::new(Expr::Value(
                (Value::SingleQuotedString("spam".to_string())).with_empty_span()
            )),
        }
    )
}

#[test]
fn arrow_precedence() {
    check_arrow_precedence("foo -> 'bar' = 'spam'", BinaryOperator::Arrow);
}

#[test]
fn long_arrow_precedence() {
    check_arrow_precedence("foo ->> 'bar' = 'spam'", BinaryOperator::LongArrow);
}

#[test]
fn arrow_cast_precedence() {
    // check this matches postgres where you would need `(foo -> 'bar')::TEXT`
    let stmt = pg().verified_expr("foo -> 'bar'::TEXT");
    assert_eq!(
        stmt,
        Expr::BinaryOp {
            left: Box::new(Expr::Identifier(Ident {
                value: "foo".to_string(),
                quote_style: None,
                span: Span::empty(),
            })),
            op: BinaryOperator::Arrow,
            right: Box::new(Expr::Cast {
                kind: CastKind::DoubleColon,
                expr: Box::new(Expr::Value(
                    (Value::SingleQuotedString("bar".to_string())).with_empty_span()
                )),
                data_type: DataType::Text,
                format: None,
            }),
        }
    )
}

#[test]
fn parse_create_type_as_enum() {
    let sql = "CREATE TYPE public.my_type AS ENUM ('label1', 'label2', 'label3', 'label4')";
    let statement = pg_and_generic().verified_stmt(sql);
    match statement {
        Statement::CreateType {
            name,
            representation: Some(UserDefinedTypeRepresentation::Enum { labels }),
        } => {
            assert_eq!("public.my_type", name.to_string());
            assert_eq!(
                vec!["label1", "label2", "label3", "label4"]
                    .into_iter()
                    .map(|l| Ident::with_quote('\'', l))
                    .collect::<Vec<Ident>>(),
                labels
            );
        }
        _ => unreachable!("{:?} should parse to Statement::CreateType", sql),
    }
}

#[test]
fn parse_alter_type() {
    struct TestCase {
        sql: &'static str,
        name: &'static str,
        operation: AlterTypeOperation,
    }
    vec![
        TestCase {
            sql: "ALTER TYPE public.my_type RENAME TO my_new_type",
            name: "public.my_type",
            operation: AlterTypeOperation::Rename(AlterTypeRename {
                new_name: Ident::new("my_new_type"),
            }),
        },
        TestCase {
            sql: "ALTER TYPE public.my_type ADD VALUE IF NOT EXISTS 'label3.5' BEFORE 'label4'",
            name: "public.my_type",
            operation: AlterTypeOperation::AddValue(AlterTypeAddValue {
                if_not_exists: true,
                value: Ident::with_quote('\'', "label3.5"),
                position: Some(AlterTypeAddValuePosition::Before(Ident::with_quote(
                    '\'', "label4",
                ))),
            }),
        },
        TestCase {
            sql: "ALTER TYPE public.my_type ADD VALUE 'label3.5' BEFORE 'label4'",
            name: "public.my_type",
            operation: AlterTypeOperation::AddValue(AlterTypeAddValue {
                if_not_exists: false,
                value: Ident::with_quote('\'', "label3.5"),
                position: Some(AlterTypeAddValuePosition::Before(Ident::with_quote(
                    '\'', "label4",
                ))),
            }),
        },
        TestCase {
            sql: "ALTER TYPE public.my_type ADD VALUE IF NOT EXISTS 'label3.5' AFTER 'label3'",
            name: "public.my_type",
            operation: AlterTypeOperation::AddValue(AlterTypeAddValue {
                if_not_exists: true,
                value: Ident::with_quote('\'', "label3.5"),
                position: Some(AlterTypeAddValuePosition::After(Ident::with_quote(
                    '\'', "label3",
                ))),
            }),
        },
        TestCase {
            sql: "ALTER TYPE public.my_type ADD VALUE 'label3.5' AFTER 'label3'",
            name: "public.my_type",
            operation: AlterTypeOperation::AddValue(AlterTypeAddValue {
                if_not_exists: false,
                value: Ident::with_quote('\'', "label3.5"),
                position: Some(AlterTypeAddValuePosition::After(Ident::with_quote(
                    '\'', "label3",
                ))),
            }),
        },
        TestCase {
            sql: "ALTER TYPE public.my_type ADD VALUE IF NOT EXISTS 'label5'",
            name: "public.my_type",
            operation: AlterTypeOperation::AddValue(AlterTypeAddValue {
                if_not_exists: true,
                value: Ident::with_quote('\'', "label5"),
                position: None,
            }),
        },
        TestCase {
            sql: "ALTER TYPE public.my_type ADD VALUE 'label5'",
            name: "public.my_type",
            operation: AlterTypeOperation::AddValue(AlterTypeAddValue {
                if_not_exists: false,
                value: Ident::with_quote('\'', "label5"),
                position: None,
            }),
        },
    ]
    .into_iter()
    .enumerate()
    .for_each(|(index, tc)| {
        let statement = pg_and_generic().verified_stmt(tc.sql);
        if let Statement::AlterType(AlterType { name, operation }) = statement {
            assert_eq!(tc.name, name.to_string(), "TestCase[{index}].name");
            assert_eq!(tc.operation, operation, "TestCase[{index}].operation");
        } else {
            unreachable!("{:?} should parse to Statement::AlterType", tc.sql);
        }
    });
}

#[test]
fn parse_bitstring_literal() {
    let select = pg_and_generic().verified_only_select("SELECT B'111'");
    assert_eq!(
        select.projection,
        vec![SelectItem::UnnamedExpr(Expr::Value(
            (Value::SingleQuotedByteStringLiteral("111".to_string())).with_empty_span()
        ))]
    );
}

#[test]
fn parse_varbit_datatype() {
    match pg_and_generic().verified_stmt("CREATE TABLE foo (x VARBIT, y VARBIT(42))") {
        Statement::CreateTable(CreateTable { columns, .. }) => {
            assert_eq!(
                columns,
                vec![
                    ColumnDef {
                        name: "x".into(),
                        data_type: DataType::VarBit(None),
                        options: vec![],
                    },
                    ColumnDef {
                        name: "y".into(),
                        data_type: DataType::VarBit(Some(42)),
                        options: vec![],
                    }
                ]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_alter_table_replica_identity() {
    match pg_and_generic().verified_stmt("ALTER TABLE foo REPLICA IDENTITY FULL") {
        Statement::AlterTable(AlterTable { operations, .. }) => {
            assert_eq!(
                operations,
                vec![AlterTableOperation::ReplicaIdentity {
                    identity: ReplicaIdentity::Full
                }]
            );
        }
        _ => unreachable!(),
    }

    match pg_and_generic().verified_stmt("ALTER TABLE foo REPLICA IDENTITY USING INDEX foo_idx") {
        Statement::AlterTable(AlterTable { operations, .. }) => {
            assert_eq!(
                operations,
                vec![AlterTableOperation::ReplicaIdentity {
                    identity: ReplicaIdentity::Index("foo_idx".into())
                }]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_ts_datatypes() {
    match pg_and_generic().verified_stmt("CREATE TABLE foo (x TSVECTOR)") {
        Statement::CreateTable(CreateTable { columns, .. }) => {
            assert_eq!(
                columns,
                vec![ColumnDef {
                    name: "x".into(),
                    data_type: DataType::TsVector,
                    options: vec![],
                }]
            );
        }
        _ => unreachable!(),
    }

    match pg_and_generic().verified_stmt("CREATE TABLE foo (x TSQUERY)") {
        Statement::CreateTable(CreateTable { columns, .. }) => {
            assert_eq!(
                columns,
                vec![ColumnDef {
                    name: "x".into(),
                    data_type: DataType::TsQuery,
                    options: vec![],
                }]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_alter_table_constraint_not_valid() {
    match pg_and_generic().verified_stmt(
        "ALTER TABLE foo ADD CONSTRAINT bar FOREIGN KEY (baz) REFERENCES other(ref) NOT VALID",
    ) {
        Statement::AlterTable(AlterTable { operations, .. }) => {
            assert_eq!(
                operations,
                vec![AlterTableOperation::AddConstraint {
                    constraint: ForeignKeyConstraint {
                        name: Some("bar".into()),
                        index_name: None,
                        columns: vec!["baz".into()],
                        foreign_table: ObjectName::from(vec!["other".into()]),
                        referred_columns: vec!["ref".into()],
                        on_delete: None,
                        on_update: None,
                        match_kind: None,
                        characteristics: None,
                    }
                    .into(),
                    not_valid: true,
                }]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_alter_table_validate_constraint() {
    match pg_and_generic().verified_stmt("ALTER TABLE foo VALIDATE CONSTRAINT bar") {
        Statement::AlterTable(AlterTable { operations, .. }) => {
            assert_eq!(
                operations,
                vec![AlterTableOperation::ValidateConstraint { name: "bar".into() }]
            );
        }
        _ => unreachable!(),
    }
}

#[test]
fn parse_create_server() {
    let test_cases = vec![
        (
            "CREATE SERVER myserver FOREIGN DATA WRAPPER postgres_fdw",
            CreateServerStatement {
                name: ObjectName::from(vec!["myserver".into()]),
                if_not_exists: false,
                server_type: None,
                version: None,
                foreign_data_wrapper: ObjectName::from(vec!["postgres_fdw".into()]),
                options: None,
            },
        ),
        (
            "CREATE SERVER IF NOT EXISTS myserver TYPE 'server_type' VERSION 'server_version' FOREIGN DATA WRAPPER postgres_fdw",
            CreateServerStatement {
            name: ObjectName::from(vec!["myserver".into()]),
            if_not_exists: true,
            server_type: Some(Ident {
                value: "server_type".to_string(),
                quote_style: Some('\''),
                span: Span::empty(),
            }),
            version: Some(Ident {
                value: "server_version".to_string(),
                quote_style: Some('\''),
                span: Span::empty(),
            }),
            foreign_data_wrapper: ObjectName::from(vec!["postgres_fdw".into()]),
            options: None,
        }
        ),
        (
            "CREATE SERVER myserver2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'foo', dbname 'foodb', port '5432')",
            CreateServerStatement {
                name: ObjectName::from(vec!["myserver2".into()]),
                if_not_exists: false,
                server_type: None,
                version: None,
                foreign_data_wrapper: ObjectName::from(vec!["postgres_fdw".into()]),
                options: Some(vec![
                    CreateServerOption {
                        key: "host".into(),
                        value: Ident {
                            value: "foo".to_string(),
                            quote_style: Some('\''),
                            span: Span::empty(),
                        },
                    },
                    CreateServerOption {
                        key: "dbname".into(),
                        value: Ident {
                            value: "foodb".to_string(),
                            quote_style: Some('\''),
                            span: Span::empty(),
                        },
                    },
                    CreateServerOption {
                        key: "port".into(),
                        value: Ident {
                            value: "5432".to_string(),
                            quote_style: Some('\''),
                            span: Span::empty(),
                        },
                    },
                ]),
            }
        )
    ];

    for (sql, expected) in test_cases {
        let Statement::CreateServer(stmt) = pg_and_generic().verified_stmt(sql) else {
            unreachable!()
        };
        assert_eq!(stmt, expected);
    }
}

#[test]
fn parse_alter_schema() {
    // Test RENAME operation
    let stmt = pg_and_generic().verified_stmt("ALTER SCHEMA foo RENAME TO bar");
    match stmt {
        Statement::AlterSchema(AlterSchema { operations, .. }) => {
            assert_eq!(
                operations,
                vec![AlterSchemaOperation::Rename {
                    name: ObjectName::from(vec!["bar".into()])
                }]
            );
        }
        _ => unreachable!(),
    }

    // Test OWNER TO operations with different owner types
    for (owner_clause, expected_owner) in &[
        ("bar", Owner::Ident("bar".into())),
        ("CURRENT_ROLE", Owner::CurrentRole),
        ("CURRENT_USER", Owner::CurrentUser),
        ("SESSION_USER", Owner::SessionUser),
    ] {
        let sql = format!("ALTER SCHEMA foo OWNER TO {}", owner_clause);
        let stmt = pg_and_generic().verified_stmt(&sql);
        match stmt {
            Statement::AlterSchema(AlterSchema { operations, .. }) => {
                assert_eq!(
                    operations,
                    vec![AlterSchemaOperation::OwnerTo {
                        owner: expected_owner.clone()
                    }]
                );
            }
            _ => unreachable!(),
        }
    }
}

#[test]
fn parse_foreign_key_match() {
    let test_cases = [
        ("MATCH FULL", ConstraintReferenceMatchKind::Full),
        ("MATCH SIMPLE", ConstraintReferenceMatchKind::Simple),
        ("MATCH PARTIAL", ConstraintReferenceMatchKind::Partial),
    ];

    for (match_clause, expected_kind) in test_cases {
        // Test column-level foreign key
        let sql = format!("CREATE TABLE t (id INT REFERENCES other_table (id) {match_clause})");
        let statement = pg_and_generic().verified_stmt(&sql);
        match statement {
            Statement::CreateTable(CreateTable { columns, .. }) => {
                match &columns[0].options[0].option {
                    ColumnOption::ForeignKey(constraint) => {
                        assert_eq!(constraint.match_kind, Some(expected_kind));
                    }
                    _ => panic!("Expected ColumnOption::ForeignKey"),
                }
            }
            _ => unreachable!("{:?} should parse to Statement::CreateTable", sql),
        }

        // Test table-level foreign key constraint
        let sql = format!(
            "CREATE TABLE t (id INT, FOREIGN KEY (id) REFERENCES other_table(id) {match_clause})"
        );
        let statement = pg_and_generic().verified_stmt(&sql);
        match statement {
            Statement::CreateTable(CreateTable { constraints, .. }) => match &constraints[0] {
                TableConstraint::ForeignKey(constraint) => {
                    assert_eq!(constraint.match_kind, Some(expected_kind));
                }
                _ => panic!("Expected TableConstraint::ForeignKey"),
            },
            _ => unreachable!("{:?} should parse to Statement::CreateTable", sql),
        }
    }
}

#[test]
fn parse_foreign_key_match_with_actions() {
    let sql = "CREATE TABLE orders (order_id INT REFERENCES another_table (id) MATCH FULL ON DELETE CASCADE ON UPDATE RESTRICT, customer_id INT, CONSTRAINT fk_customer FOREIGN KEY (customer_id) REFERENCES customers(customer_id) MATCH SIMPLE ON DELETE SET NULL ON UPDATE CASCADE)";

    pg_and_generic().verified_stmt(sql);
}

#[test]
fn parse_create_operator() {
    let sql = "CREATE OPERATOR myschema.@@ (PROCEDURE = myschema.my_proc, LEFTARG = TIMESTAMP WITH TIME ZONE, RIGHTARG = VARCHAR(255), COMMUTATOR = schema.>, NEGATOR = schema.<=, RESTRICT = myschema.sel_func, JOIN = myschema.join_func, HASHES, MERGES)";
    assert_eq!(
        pg().verified_stmt(sql),
        Statement::CreateOperator(CreateOperator {
            name: ObjectName::from(vec![Ident::new("myschema"), Ident::new("@@")]),
            function: ObjectName::from(vec![Ident::new("myschema"), Ident::new("my_proc")]),
            is_procedure: true,
            left_arg: Some(DataType::Timestamp(None, TimezoneInfo::WithTimeZone)),
            right_arg: Some(DataType::Varchar(Some(CharacterLength::IntegerLength {
                length: 255,
                unit: None
            }))),
            options: vec![
                OperatorOption::Commutator(ObjectName::from(vec![
                    Ident::new("schema"),
                    Ident::new(">")
                ])),
                OperatorOption::Negator(ObjectName::from(vec![
                    Ident::new("schema"),
                    Ident::new("<=")
                ])),
                OperatorOption::Restrict(Some(ObjectName::from(vec![
                    Ident::new("myschema"),
                    Ident::new("sel_func")
                ]))),
                OperatorOption::Join(Some(ObjectName::from(vec![
                    Ident::new("myschema"),
                    Ident::new("join_func")
                ]))),
                OperatorOption::Hashes,
                OperatorOption::Merges,
            ],
        })
    );

    for op_symbol in &[
        "-", "*", "/", "<", ">", "=", "<=", ">=", "<>", "~", "!", "@", "#", "%", "^", "&", "|",
        "<<", ">>", "&&",
    ] {
        assert_eq!(
            pg().verified_stmt(&format!("CREATE OPERATOR {op_symbol} (FUNCTION = f)")),
            Statement::CreateOperator(CreateOperator {
                name: ObjectName::from(vec![Ident::new(*op_symbol)]),
                function: ObjectName::from(vec![Ident::new("f")]),
                is_procedure: false,
                left_arg: None,
                right_arg: None,
                options: vec![],
            })
        );
    }

    pg().one_statement_parses_to(
        "CREATE OPERATOR != (FUNCTION = func)",
        "CREATE OPERATOR <> (FUNCTION = func)",
    );

    for (name, expected_name) in [
        (
            "s1.+",
            ObjectName::from(vec![Ident::new("s1"), Ident::new("+")]),
        ),
        (
            "s2.-",
            ObjectName::from(vec![Ident::new("s2"), Ident::new("-")]),
        ),
        (
            "s1.s3.*",
            ObjectName::from(vec![Ident::new("s1"), Ident::new("s3"), Ident::new("*")]),
        ),
    ] {
        match pg().verified_stmt(&format!("CREATE OPERATOR {name} (FUNCTION = f)")) {
            Statement::CreateOperator(CreateOperator { name, options, .. }) => {
                assert_eq!(name, expected_name);
                assert!(options.is_empty());
            }
            _ => unreachable!(),
        }
    }

    pg().one_statement_parses_to(
        "CREATE OPERATOR + (FUNCTION = f, COMMUTATOR = OPERATOR(>), NEGATOR = OPERATOR(>=))",
        "CREATE OPERATOR + (FUNCTION = f, COMMUTATOR = >, NEGATOR = >=)",
    );

    // Test all duplicate clause errors
    for field in &[
        "FUNCTION = f2",
        "PROCEDURE = p",
        "LEFTARG = INT4, LEFTARG = INT4",
        "RIGHTARG = INT4, RIGHTARG = INT4",
        "COMMUTATOR = -, COMMUTATOR = *",
        "NEGATOR = -, NEGATOR = *",
        "RESTRICT = f1, RESTRICT = f2",
        "JOIN = f1, JOIN = f2",
        "HASHES, HASHES",
        "MERGES, MERGES",
    ] {
        assert!(pg()
            .parse_sql_statements(&format!("CREATE OPERATOR + (FUNCTION = f, {field})"))
            .is_err());
    }

    // Test missing FUNCTION/PROCEDURE error
    assert!(pg()
        .parse_sql_statements("CREATE OPERATOR + (LEFTARG = INT4)")
        .is_err());

    // Test empty parameter list error
    assert!(pg().parse_sql_statements("CREATE OPERATOR + ()").is_err());

    // Test nested empty parentheses error
    assert!(pg().parse_sql_statements("CREATE OPERATOR > (()").is_err());
    assert!(pg().parse_sql_statements("CREATE OPERATOR > ())").is_err());
}

#[test]
fn parse_drop_operator() {
    // Test DROP OPERATOR with NONE for prefix operator
    let sql = "DROP OPERATOR ~ (NONE, BIT)";
    assert_eq!(
        pg_and_generic().verified_stmt(sql),
        Statement::DropOperator(DropOperator {
            if_exists: false,
            operators: vec![DropOperatorSignature {
                name: ObjectName::from(vec![Ident::new("~")]),
                left_type: None,
                right_type: DataType::Bit(None),
            }],
            drop_behavior: None,
        })
    );

    for if_exist in [true, false] {
        for cascading in [
            None,
            Some(DropBehavior::Cascade),
            Some(DropBehavior::Restrict),
        ] {
            for op in &["<", ">", "<=", ">=", "<>", "||", "&&", "<<", ">>"] {
                let sql = format!(
                    "DROP OPERATOR{} {op} (INTEGER, INTEGER){}",
                    if if_exist { " IF EXISTS" } else { "" },
                    match cascading {
                        Some(cascading) => format!(" {cascading}"),
                        None => String::new(),
                    }
                );
                assert_eq!(
                    pg_and_generic().verified_stmt(&sql),
                    Statement::DropOperator(DropOperator {
                        if_exists: if_exist,
                        operators: vec![DropOperatorSignature {
                            name: ObjectName::from(vec![Ident::new(*op)]),
                            left_type: Some(DataType::Integer(None)),
                            right_type: DataType::Integer(None),
                        }],
                        drop_behavior: cascading,
                    })
                );
            }
        }
    }

    // Test DROP OPERATOR with schema-qualified operator name
    let sql = "DROP OPERATOR myschema.@@ (TEXT, TEXT)";
    assert_eq!(
        pg_and_generic().verified_stmt(sql),
        Statement::DropOperator(DropOperator {
            if_exists: false,
            operators: vec![DropOperatorSignature {
                name: ObjectName::from(vec![Ident::new("myschema"), Ident::new("@@")]),
                left_type: Some(DataType::Text),
                right_type: DataType::Text,
            }],
            drop_behavior: None,
        })
    );

    // Test DROP OPERATOR with multiple operators, IF EXISTS and CASCADE
    let sql = "DROP OPERATOR IF EXISTS + (INTEGER, INTEGER), - (INTEGER, INTEGER) CASCADE";
    assert_eq!(
        pg_and_generic().verified_stmt(sql),
        Statement::DropOperator(DropOperator {
            if_exists: true,
            operators: vec![
                DropOperatorSignature {
                    name: ObjectName::from(vec![Ident::new("+")]),
                    left_type: Some(DataType::Integer(None)),
                    right_type: DataType::Integer(None),
                },
                DropOperatorSignature {
                    name: ObjectName::from(vec![Ident::new("-")]),
                    left_type: Some(DataType::Integer(None)),
                    right_type: DataType::Integer(None),
                }
            ],
            drop_behavior: Some(DropBehavior::Cascade),
        })
    );

    // Test error: DROP OPERATOR with no operators
    let sql = "DROP OPERATOR (INTEGER, INTEGER)";
    assert!(pg().parse_sql_statements(sql).is_err());

    // Test error: DROP OPERATOR IF EXISTS with no operators
    let sql = "DROP OPERATOR IF EXISTS (INTEGER, INTEGER)";
    assert!(pg().parse_sql_statements(sql).is_err());
}

#[test]
fn parse_alter_operator() {
    use sqlparser::ast::{AlterOperator, AlterOperatorOperation, OperatorOption, Owner};

    // Test ALTER OPERATOR ... OWNER TO with different owner types
    for (owner_sql, owner_ast) in [
        ("joe", Owner::Ident(Ident::new("joe"))),
        ("CURRENT_USER", Owner::CurrentUser),
        ("CURRENT_ROLE", Owner::CurrentRole),
        ("SESSION_USER", Owner::SessionUser),
    ] {
        for (op_name, op_name_ast, left_type_sql, left_type_ast, right_type_sql, right_type_ast) in [
            (
                "+",
                ObjectName::from(vec![Ident::new("+")]),
                "INTEGER",
                Some(DataType::Integer(None)),
                "INTEGER",
                DataType::Integer(None),
            ),
            (
                "~",
                ObjectName::from(vec![Ident::new("~")]),
                "NONE",
                None,
                "BIT",
                DataType::Bit(None),
            ),
            (
                "@@",
                ObjectName::from(vec![Ident::new("@@")]),
                "TEXT",
                Some(DataType::Text),
                "TEXT",
                DataType::Text,
            ),
        ] {
            let sql = format!(
                "ALTER OPERATOR {} ({}, {}) OWNER TO {}",
                op_name, left_type_sql, right_type_sql, owner_sql
            );
            assert_eq!(
                pg_and_generic().verified_stmt(&sql),
                Statement::AlterOperator(AlterOperator {
                    name: op_name_ast.clone(),
                    left_type: left_type_ast.clone(),
                    right_type: right_type_ast.clone(),
                    operation: AlterOperatorOperation::OwnerTo(owner_ast.clone()),
                })
            );
        }
    }

    // Test ALTER OPERATOR ... SET SCHEMA
    for (op_name, op_name_ast, schema_name, schema_name_ast) in [
        (
            "+",
            ObjectName::from(vec![Ident::new("+")]),
            "new_schema",
            ObjectName::from(vec![Ident::new("new_schema")]),
        ),
        (
            "myschema.@@",
            ObjectName::from(vec![Ident::new("myschema"), Ident::new("@@")]),
            "other_schema",
            ObjectName::from(vec![Ident::new("other_schema")]),
        ),
    ] {
        let sql = format!(
            "ALTER OPERATOR {} (TEXT, TEXT) SET SCHEMA {}",
            op_name, schema_name
        );
        assert_eq!(
            pg_and_generic().verified_stmt(&sql),
            Statement::AlterOperator(AlterOperator {
                name: op_name_ast,
                left_type: Some(DataType::Text),
                right_type: DataType::Text,
                operation: AlterOperatorOperation::SetSchema {
                    schema_name: schema_name_ast,
                },
            })
        );
    }

    // Test ALTER OPERATOR ... SET with RESTRICT and JOIN
    for (restrict_val, restrict_ast, join_val, join_ast) in [
        (
            "_int_contsel",
            Some(ObjectName::from(vec![Ident::new("_int_contsel")])),
            "_int_contjoinsel",
            Some(ObjectName::from(vec![Ident::new("_int_contjoinsel")])),
        ),
        (
            "NONE",
            None,
            "my_joinsel",
            Some(ObjectName::from(vec![Ident::new("my_joinsel")])),
        ),
        (
            "my_sel",
            Some(ObjectName::from(vec![Ident::new("my_sel")])),
            "NONE",
            None,
        ),
    ] {
        let sql = format!(
            "ALTER OPERATOR && (TEXT, TEXT) SET (RESTRICT = {}, JOIN = {})",
            restrict_val, join_val
        );
        assert_eq!(
            pg_and_generic().verified_stmt(&sql),
            Statement::AlterOperator(AlterOperator {
                name: ObjectName::from(vec![Ident::new("&&")]),
                left_type: Some(DataType::Text),
                right_type: DataType::Text,
                operation: AlterOperatorOperation::Set {
                    options: vec![
                        OperatorOption::Restrict(restrict_ast),
                        OperatorOption::Join(join_ast),
                    ],
                },
            })
        );
    }

    // Test ALTER OPERATOR ... SET with COMMUTATOR and NEGATOR
    for (operator, commutator, negator) in [("&&", "&&", ">"), ("+", "+", "-"), ("<", "<", ">=")] {
        let sql = format!(
            "ALTER OPERATOR {} (INTEGER, INTEGER) SET (COMMUTATOR = {}, NEGATOR = {})",
            operator, commutator, negator
        );
        assert_eq!(
            pg_and_generic().verified_stmt(&sql),
            Statement::AlterOperator(AlterOperator {
                name: ObjectName::from(vec![Ident::new(operator)]),
                left_type: Some(DataType::Integer(None)),
                right_type: DataType::Integer(None),
                operation: AlterOperatorOperation::Set {
                    options: vec![
                        OperatorOption::Commutator(ObjectName::from(vec![Ident::new(commutator)])),
                        OperatorOption::Negator(ObjectName::from(vec![Ident::new(negator)])),
                    ],
                },
            })
        );
    }

    // Test ALTER OPERATOR ... SET with HASHES and MERGES (individually and combined)
    for (operator, options_sql, options_ast) in [
        ("=", "HASHES", vec![OperatorOption::Hashes]),
        ("<", "MERGES", vec![OperatorOption::Merges]),
        (
            "<=",
            "HASHES, MERGES",
            vec![OperatorOption::Hashes, OperatorOption::Merges],
        ),
    ] {
        let sql = format!(
            "ALTER OPERATOR {} (INTEGER, INTEGER) SET ({})",
            operator, options_sql
        );
        assert_eq!(
            pg_and_generic().verified_stmt(&sql),
            Statement::AlterOperator(AlterOperator {
                name: ObjectName::from(vec![Ident::new(operator)]),
                left_type: Some(DataType::Integer(None)),
                right_type: DataType::Integer(None),
                operation: AlterOperatorOperation::Set {
                    options: options_ast
                },
            })
        );
    }

    // Test ALTER OPERATOR ... SET with multiple options combined
    let sql =
        "ALTER OPERATOR + (INTEGER, INTEGER) SET (COMMUTATOR = +, NEGATOR = -, HASHES, MERGES)";
    assert_eq!(
        pg_and_generic().verified_stmt(sql),
        Statement::AlterOperator(AlterOperator {
            name: ObjectName::from(vec![Ident::new("+")]),
            left_type: Some(DataType::Integer(None)),
            right_type: DataType::Integer(None),
            operation: AlterOperatorOperation::Set {
                options: vec![
                    OperatorOption::Commutator(ObjectName::from(vec![Ident::new("+")])),
                    OperatorOption::Negator(ObjectName::from(vec![Ident::new("-")])),
                    OperatorOption::Hashes,
                    OperatorOption::Merges,
                ],
            },
        })
    );
}

#[test]
fn parse_drop_operator_family() {
    for if_exists in [true, false] {
        for drop_behavior in [
            None,
            Some(DropBehavior::Cascade),
            Some(DropBehavior::Restrict),
        ] {
            for index_method in &["btree", "hash", "gist", "gin", "spgist", "brin"] {
                for (names_str, names_vec) in [
                    (
                        "float_ops",
                        vec![ObjectName::from(vec![Ident::new("float_ops")])],
                    ),
                    (
                        "myschema.custom_ops",
                        vec![ObjectName::from(vec![
                            Ident::new("myschema"),
                            Ident::new("custom_ops"),
                        ])],
                    ),
                    (
                        "ops1, ops2, schema.ops3",
                        vec![
                            ObjectName::from(vec![Ident::new("ops1")]),
                            ObjectName::from(vec![Ident::new("ops2")]),
                            ObjectName::from(vec![Ident::new("schema"), Ident::new("ops3")]),
                        ],
                    ),
                ] {
                    let sql = format!(
                        "DROP OPERATOR FAMILY{} {} USING {}{}",
                        if if_exists { " IF EXISTS" } else { "" },
                        names_str,
                        index_method,
                        match drop_behavior {
                            Some(behavior) => format!(" {}", behavior),
                            None => String::new(),
                        }
                    );
                    assert_eq!(
                        pg_and_generic().verified_stmt(&sql),
                        Statement::DropOperatorFamily(DropOperatorFamily {
                            if_exists,
                            names: names_vec,
                            using: Ident::new(*index_method),
                            drop_behavior,
                        })
                    );
                }
            }
        }
    }

    // Test error: DROP OPERATOR FAMILY with no names
    let sql = "DROP OPERATOR FAMILY USING btree";
    assert!(pg_and_generic().parse_sql_statements(sql).is_err());

    // Test error: DROP OPERATOR FAMILY IF EXISTS with no names
    let sql = "DROP OPERATOR FAMILY IF EXISTS USING btree";
    assert!(pg_and_generic().parse_sql_statements(sql).is_err());
}

#[test]
fn parse_drop_operator_class() {
    for if_exists in [true, false] {
        for drop_behavior in [
            None,
            Some(DropBehavior::Cascade),
            Some(DropBehavior::Restrict),
        ] {
            for index_method in &["btree", "hash", "gist", "gin", "spgist", "brin"] {
                for (names_str, names_vec) in [
                    (
                        "widget_ops",
                        vec![ObjectName::from(vec![Ident::new("widget_ops")])],
                    ),
                    (
                        "myschema.int4_ops",
                        vec![ObjectName::from(vec![
                            Ident::new("myschema"),
                            Ident::new("int4_ops"),
                        ])],
                    ),
                    (
                        "ops1, ops2, schema.ops3",
                        vec![
                            ObjectName::from(vec![Ident::new("ops1")]),
                            ObjectName::from(vec![Ident::new("ops2")]),
                            ObjectName::from(vec![Ident::new("schema"), Ident::new("ops3")]),
                        ],
                    ),
                ] {
                    let sql = format!(
                        "DROP OPERATOR CLASS{} {} USING {}{}",
                        if if_exists { " IF EXISTS" } else { "" },
                        names_str,
                        index_method,
                        match drop_behavior {
                            Some(behavior) => format!(" {}", behavior),
                            None => String::new(),
                        }
                    );
                    assert_eq!(
                        pg_and_generic().verified_stmt(&sql),
                        Statement::DropOperatorClass(DropOperatorClass {
                            if_exists,
                            names: names_vec.clone(),
                            using: Ident::new(*index_method),
                            drop_behavior,
                        })
                    );
                }
            }
        }
    }

    // Test error: DROP OPERATOR CLASS with no names
    let sql = "DROP OPERATOR CLASS USING btree";
    assert!(pg_and_generic().parse_sql_statements(sql).is_err());

    // Test error: DROP OPERATOR CLASS IF EXISTS with no names
    let sql = "DROP OPERATOR CLASS IF EXISTS USING btree";
    assert!(pg_and_generic().parse_sql_statements(sql).is_err());
}

#[test]
fn parse_create_operator_family() {
    for index_method in &["btree", "hash", "gist", "gin", "spgist", "brin"] {
        assert_eq!(
            pg_and_generic().verified_stmt(&format!(
                "CREATE OPERATOR FAMILY my_family USING {index_method}"
            )),
            Statement::CreateOperatorFamily(CreateOperatorFamily {
                name: ObjectName::from(vec![Ident::new("my_family")]),
                using: Ident::new(*index_method),
            })
        );
        assert_eq!(
            pg_and_generic().verified_stmt(&format!(
                "CREATE OPERATOR FAMILY myschema.test_family USING {index_method}"
            )),
            Statement::CreateOperatorFamily(CreateOperatorFamily {
                name: ObjectName::from(vec![Ident::new("myschema"), Ident::new("test_family")]),
                using: Ident::new(*index_method),
            })
        );
    }
}

#[test]
fn parse_create_operator_class() {
    // Test all combinations of DEFAULT flag and FAMILY clause with different name qualifications
    for (is_default, default_clause) in [(false, ""), (true, "DEFAULT ")] {
        for (has_family, family_clause) in [(false, ""), (true, " FAMILY int4_family")] {
            for (class_name, expected_name) in [
                ("int4_ops", ObjectName::from(vec![Ident::new("int4_ops")])),
                (
                    "myschema.test_ops",
                    ObjectName::from(vec![Ident::new("myschema"), Ident::new("test_ops")]),
                ),
            ] {
                let sql = format!(
                    "CREATE OPERATOR CLASS {class_name} {default_clause}FOR TYPE INT4 USING btree{family_clause} AS OPERATOR 1 <"
                );
                match pg_and_generic().verified_stmt(&sql) {
                    Statement::CreateOperatorClass(CreateOperatorClass {
                        name,
                        default,
                        ref for_type,
                        ref using,
                        ref family,
                        ref items,
                    }) => {
                        assert_eq!(name, expected_name);
                        assert_eq!(default, is_default);
                        assert_eq!(for_type, &DataType::Int4(None));
                        assert_eq!(using, &Ident::new("btree"));
                        assert_eq!(
                            family,
                            &if has_family {
                                Some(ObjectName::from(vec![Ident::new("int4_family")]))
                            } else {
                                None
                            }
                        );
                        assert_eq!(items.len(), 1);
                    }
                    _ => panic!("Expected CreateOperatorClass statement"),
                }
            }
        }
    }

    // Test comprehensive operator class with all fields
    match pg_and_generic().verified_stmt("CREATE OPERATOR CLASS CAS_btree_ops DEFAULT FOR TYPE CAS USING btree FAMILY CAS_btree_ops AS OPERATOR 1 <, OPERATOR 2 <=, OPERATOR 3 =, OPERATOR 4 >=, OPERATOR 5 >, FUNCTION 1 cas_cmp(CAS, CAS)") {
        Statement::CreateOperatorClass(CreateOperatorClass {
            name,
            default: true,
            ref for_type,
            ref using,
            ref family,
            ref items,
        }) => {
            assert_eq!(name, ObjectName::from(vec![Ident::new("CAS_btree_ops")]));
            assert_eq!(for_type, &DataType::Custom(ObjectName::from(vec![Ident::new("CAS")]), vec![]));
            assert_eq!(using, &Ident::new("btree"));
            assert_eq!(family, &Some(ObjectName::from(vec![Ident::new("CAS_btree_ops")])));
            assert_eq!(items.len(), 6);
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test operator with argument types
    match pg_and_generic().verified_stmt(
        "CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING gist AS OPERATOR 1 < (INT4, INT4)",
    ) {
        Statement::CreateOperatorClass(CreateOperatorClass { ref items, .. }) => {
            assert_eq!(items.len(), 1);
            match &items[0] {
                OperatorClassItem::Operator {
                    strategy_number: 1,
                    ref operator_name,
                    op_types:
                        Some(OperatorArgTypes {
                            left: DataType::Int4(None),
                            right: DataType::Int4(None),
                        }),
                    purpose: None,
                } => {
                    assert_eq!(operator_name, &ObjectName::from(vec![Ident::new("<")]));
                }
                _ => panic!("Expected Operator item with arg types"),
            }
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test operator FOR SEARCH
    match pg_and_generic().verified_stmt(
        "CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING gist AS OPERATOR 1 < FOR SEARCH",
    ) {
        Statement::CreateOperatorClass(CreateOperatorClass { ref items, .. }) => {
            assert_eq!(items.len(), 1);
            match &items[0] {
                OperatorClassItem::Operator {
                    strategy_number: 1,
                    ref operator_name,
                    op_types: None,
                    purpose: Some(OperatorPurpose::ForSearch),
                } => {
                    assert_eq!(operator_name, &ObjectName::from(vec![Ident::new("<")]));
                }
                _ => panic!("Expected Operator item FOR SEARCH"),
            }
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test operator FOR ORDER BY
    match pg().verified_stmt("CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING gist AS OPERATOR 2 <<-> FOR ORDER BY float_ops") {
        Statement::CreateOperatorClass(CreateOperatorClass {
            ref items,
            ..
        }) => {
            assert_eq!(items.len(), 1);
            match &items[0] {
                OperatorClassItem::Operator {
                    strategy_number: 2,
                    ref operator_name,
                    op_types: None,
                    purpose: Some(OperatorPurpose::ForOrderBy { ref sort_family }),
                } => {
                    assert_eq!(operator_name, &ObjectName::from(vec![Ident::new("<<->")]));
                    assert_eq!(sort_family, &ObjectName::from(vec![Ident::new("float_ops")]));
                }
                _ => panic!("Expected Operator item FOR ORDER BY"),
            }
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test function with operator class arg types
    match pg_and_generic().verified_stmt("CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING btree AS FUNCTION 1 (INT4, INT4) btcmp(INT4, INT4)") {
        Statement::CreateOperatorClass(CreateOperatorClass {
            ref items,
            ..
        }) => {
            assert_eq!(items.len(), 1);
            match &items[0] {
                OperatorClassItem::Function {
                    support_number: 1,
                    op_types: Some(_),
                    ref function_name,
                    ref argument_types,
                } => {
                    assert_eq!(function_name, &ObjectName::from(vec![Ident::new("btcmp")]));
                    assert_eq!(argument_types.len(), 2);
                }
                _ => panic!("Expected Function item with op_types"),
            }
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test function with no arguments (empty parentheses normalizes to no parentheses)
    pg_and_generic().one_statement_parses_to(
        "CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING btree AS FUNCTION 1 my_func()",
        "CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING btree AS FUNCTION 1 my_func",
    );
    match pg_and_generic().verified_stmt(
        "CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING btree AS FUNCTION 1 my_func",
    ) {
        Statement::CreateOperatorClass(CreateOperatorClass { ref items, .. }) => {
            assert_eq!(items.len(), 1);
            match &items[0] {
                OperatorClassItem::Function {
                    support_number: 1,
                    op_types: None,
                    ref function_name,
                    ref argument_types,
                } => {
                    assert_eq!(
                        function_name,
                        &ObjectName::from(vec![Ident::new("my_func")])
                    );
                    assert_eq!(argument_types.len(), 0);
                }
                _ => panic!("Expected Function item without op_types and no arguments"),
            }
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test multiple items including STORAGE
    match pg_and_generic().verified_stmt("CREATE OPERATOR CLASS gist_ops FOR TYPE geometry USING gist AS OPERATOR 1 <<, FUNCTION 1 gist_consistent(internal, geometry, INT4), STORAGE box") {
        Statement::CreateOperatorClass(CreateOperatorClass {
            ref items,
            ..
        }) => {
            assert_eq!(items.len(), 3);
            // Check operator item
            match &items[0] {
                OperatorClassItem::Operator {
                    strategy_number: 1,
                    ref operator_name,
                    ..
                } => {
                    assert_eq!(operator_name, &ObjectName::from(vec![Ident::new("<<")]));
                }
                _ => panic!("Expected Operator item"),
            }
            // Check function item
            match &items[1] {
                OperatorClassItem::Function {
                    support_number: 1,
                    ref function_name,
                    ref argument_types,
                    ..
                } => {
                    assert_eq!(function_name, &ObjectName::from(vec![Ident::new("gist_consistent")]));
                    assert_eq!(argument_types.len(), 3);
                }
                _ => panic!("Expected Function item"),
            }
            // Check storage item
            match &items[2] {
                OperatorClassItem::Storage { ref storage_type } => {
                    assert_eq!(storage_type, &DataType::Custom(ObjectName::from(vec![Ident::new("box")]), vec![]));
                }
                _ => panic!("Expected Storage item"),
            }
        }
        _ => panic!("Expected CreateOperatorClass statement"),
    }

    // Test nested empty parentheses error in function arguments
    assert!(pg()
        .parse_sql_statements(
            "CREATE OPERATOR CLASS test_ops FOR TYPE INT4 USING btree AS FUNCTION 1 cas_cmp(()"
        )
        .is_err());
}
